1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::{CommitBuilder, CommitProperties};
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, Transaction};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tokio::sync::mpsc::UnboundedSender;
43use url::Url;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
53 SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
54 SinkWriterParam, TwoPhaseCommitCoordinator,
55};
56
57pub const DEFAULT_REGION: &str = "us-east-1";
58pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
59
60pub const DELTALAKE_SINK: &str = "deltalake";
61
62#[serde_as]
63#[derive(Deserialize, Debug, Clone, WithOptions)]
64pub struct DeltaLakeCommon {
65 #[serde(rename = "location")]
66 pub location: String,
67 #[serde(flatten)]
68 pub aws_auth_props: AwsAuthProps,
69
70 #[serde(rename = "gcs.service.account")]
71 pub gcs_service_account: Option<String>,
72 #[serde(default = "default_commit_checkpoint_interval")]
74 #[serde_as(as = "DisplayFromStr")]
75 #[with_option(allow_alter_on_fly)]
76 pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for DeltaLakeCommon {
80 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81 "gcs.service.account",
82 };
83
84 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
85 AwsAuthProps::enforce_one(prop)?;
86 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
87 return Err(EnforceSecretError {
88 key: prop.to_owned(),
89 }
90 .into());
91 }
92
93 Ok(())
94 }
95}
96
97impl DeltaLakeCommon {
98 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
99 let table = match Self::get_table_url(&self.location)? {
100 DeltaTableUrl::S3(s3_path) => {
101 let storage_options = self.build_delta_lake_config_for_aws().await?;
102 deltalake::aws::register_handlers(None);
103 let url = Url::parse(&s3_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
104 deltalake::open_table_with_storage_options(url, storage_options).await?
105 }
106 DeltaTableUrl::Local(local_path) => {
107 let url = Url::parse(&format!("file://{}", local_path))
108 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
109 deltalake::open_table(url).await?
110 }
111 DeltaTableUrl::Gcs(gcs_path) => {
112 let mut storage_options = HashMap::new();
113 storage_options.insert(
114 GCS_SERVICE_ACCOUNT.to_owned(),
115 self.gcs_service_account.clone().ok_or_else(|| {
116 SinkError::Config(anyhow!(
117 "gcs.service.account is required with Google Cloud Storage (GCS)"
118 ))
119 })?,
120 );
121 deltalake::gcp::register_handlers(None);
122 let url = Url::parse(&gcs_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
123 deltalake::open_table_with_storage_options(url, storage_options).await?
124 }
125 };
126 Ok(table)
127 }
128
129 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
130 if path.starts_with("s3://") || path.starts_with("s3a://") {
131 Ok(DeltaTableUrl::S3(path.to_owned()))
132 } else if path.starts_with("gs://") {
133 Ok(DeltaTableUrl::Gcs(path.to_owned()))
134 } else if let Some(path) = path.strip_prefix("file://") {
135 Ok(DeltaTableUrl::Local(path.to_owned()))
136 } else {
137 Err(SinkError::DeltaLake(anyhow!(
138 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
139 )))
140 }
141 }
142
143 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
144 let mut storage_options = HashMap::new();
145 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
146 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
147 let sdk_config = self.aws_auth_props.build_config().await?;
148 let credentials = sdk_config
149 .credentials_provider()
150 .ok_or_else(|| {
151 SinkError::Config(anyhow!(
152 "s3.access.key and s3.secret.key is required with aws s3"
153 ))
154 })?
155 .as_ref()
156 .provide_credentials()
157 .await
158 .map_err(|e| SinkError::Config(e.into()))?;
159 let region = sdk_config.region();
160 let endpoint = sdk_config.endpoint_url();
161 storage_options.insert(
162 AWS_ACCESS_KEY_ID.to_owned(),
163 credentials.access_key_id().to_owned(),
164 );
165 storage_options.insert(
166 AWS_SECRET_ACCESS_KEY.to_owned(),
167 credentials.secret_access_key().to_owned(),
168 );
169 if endpoint.is_none() && region.is_none() {
170 return Err(SinkError::Config(anyhow!(
171 "s3.endpoint and s3.region need to be filled with at least one"
172 )));
173 }
174 storage_options.insert(
175 AWS_REGION.to_owned(),
176 region
177 .map(|r| r.as_ref().to_owned())
178 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
179 );
180 if let Some(s3_endpoint) = endpoint {
181 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
182 }
183 Ok(storage_options)
184 }
185}
186
187enum DeltaTableUrl {
188 S3(String),
189 Local(String),
190 Gcs(String),
191}
192
193#[serde_as]
194#[derive(Clone, Debug, Deserialize, WithOptions)]
195pub struct DeltaLakeConfig {
196 #[serde(flatten)]
197 pub common: DeltaLakeCommon,
198
199 pub r#type: String,
200
201 #[serde_as(as = "Option<DisplayFromStr>")]
203 pub is_exactly_once: Option<bool>,
204}
205
206impl EnforceSecret for DeltaLakeConfig {
207 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
208 DeltaLakeCommon::enforce_one(prop)
209 }
210}
211
212impl DeltaLakeConfig {
213 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
214 let config = serde_json::from_value::<DeltaLakeConfig>(
215 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
216 )
217 .map_err(|e| SinkError::Config(anyhow!(e)))?;
218 Ok(config)
219 }
220}
221
222#[derive(Debug)]
223pub struct DeltaLakeSink {
224 pub config: DeltaLakeConfig,
225 param: SinkParam,
226}
227
228impl EnforceSecret for DeltaLakeSink {
229 fn enforce_secret<'a>(
230 prop_iter: impl Iterator<Item = &'a str>,
231 ) -> crate::error::ConnectorResult<()> {
232 for prop in prop_iter {
233 DeltaLakeCommon::enforce_one(prop)?;
234 }
235 Ok(())
236 }
237}
238
239impl DeltaLakeSink {
240 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
241 Ok(Self { config, param })
242 }
243}
244
245fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
246 let result = match rw_data_type {
247 DataType::Boolean => {
248 matches!(
249 dl_data_type,
250 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
251 )
252 }
253 DataType::Int16 => {
254 matches!(
255 dl_data_type,
256 DeltaLakeDataType::Primitive(PrimitiveType::Short)
257 )
258 }
259 DataType::Int32 => {
260 matches!(
261 dl_data_type,
262 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
263 )
264 }
265 DataType::Int64 => {
266 matches!(
267 dl_data_type,
268 DeltaLakeDataType::Primitive(PrimitiveType::Long)
269 )
270 }
271 DataType::Float32 => {
272 matches!(
273 dl_data_type,
274 DeltaLakeDataType::Primitive(PrimitiveType::Float)
275 )
276 }
277 DataType::Float64 => {
278 matches!(
279 dl_data_type,
280 DeltaLakeDataType::Primitive(PrimitiveType::Double)
281 )
282 }
283 DataType::Decimal => {
284 matches!(
285 dl_data_type,
286 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
287 )
288 }
289 DataType::Date => {
290 matches!(
291 dl_data_type,
292 DeltaLakeDataType::Primitive(PrimitiveType::Date)
293 )
294 }
295 DataType::Varchar => {
296 matches!(
297 dl_data_type,
298 DeltaLakeDataType::Primitive(PrimitiveType::String)
299 )
300 }
301 DataType::Timestamptz => {
302 matches!(
303 dl_data_type,
304 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
305 )
306 }
307 DataType::Struct(rw_struct) => {
308 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
309 let mut result = true;
310 for ((rw_name, rw_type), dl_field) in
311 rw_struct.iter().zip_eq_debug(dl_struct.fields())
312 {
313 result = check_field_type(rw_type, dl_field.data_type())?
314 && result
315 && rw_name.eq(dl_field.name());
316 }
317 result
318 } else {
319 false
320 }
321 }
322 DataType::List(rw_list) => {
323 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
324 check_field_type(rw_list.elem(), dl_list.element_type())?
325 } else {
326 false
327 }
328 }
329 _ => {
330 return Err(SinkError::DeltaLake(anyhow!(
331 "Type {:?} is not supported for DeltaLake sink.",
332 rw_data_type.to_owned()
333 )));
334 }
335 };
336 Ok(result)
337}
338
339impl Sink for DeltaLakeSink {
340 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
341
342 const SINK_NAME: &'static str = DELTALAKE_SINK;
343
344 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
345 let inner = DeltaLakeSinkWriter::new(
346 self.config.clone(),
347 self.param.schema().clone(),
348 self.param.downstream_pk_or_empty(),
349 )
350 .await?;
351
352 let commit_checkpoint_interval =
353 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
354 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
355 );
356
357 let writer = CoordinatedLogSinker::new(
358 &writer_param,
359 self.param.clone(),
360 inner,
361 commit_checkpoint_interval,
362 )
363 .await?;
364
365 Ok(writer)
366 }
367
368 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
369 DeltaLakeConfig::from_btreemap(config.clone())?;
370 Ok(())
371 }
372
373 async fn validate(&self) -> Result<()> {
374 if self.config.r#type != SINK_TYPE_APPEND_ONLY
375 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
376 {
377 return Err(SinkError::Config(anyhow!(
378 "only append-only delta lake sink is supported",
379 )));
380 }
381 let table = self.config.common.create_deltalake_client().await?;
382 let snapshot = table.snapshot()?;
383 let delta_schema = snapshot.schema();
384 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
385 .fields()
386 .map(|f| (f.name(), f.data_type()))
387 .collect();
388 if deltalake_fields.len() != self.param.schema().fields().len() {
389 return Err(SinkError::DeltaLake(anyhow!(
390 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
391 self.param.schema().fields().len(),
392 deltalake_fields.len()
393 )));
394 }
395 for field in self.param.schema().fields() {
396 if !deltalake_fields.contains_key(&field.name) {
397 return Err(SinkError::DeltaLake(anyhow!(
398 "column {} not found in deltalake table",
399 field.name
400 )));
401 }
402 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
403 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
404 })?;
405 if !check_field_type(&field.data_type, deltalake_field_type)? {
406 return Err(SinkError::DeltaLake(anyhow!(
407 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
408 field.name,
409 deltalake_field_type,
410 field.data_type
411 )));
412 }
413 }
414 if self.config.common.commit_checkpoint_interval == 0 {
415 return Err(SinkError::Config(anyhow!(
416 "`commit_checkpoint_interval` must be greater than 0"
417 )));
418 }
419 Ok(())
420 }
421
422 fn is_coordinated_sink(&self) -> bool {
423 true
424 }
425
426 async fn new_coordinator(
427 &self,
428 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
429 ) -> Result<SinkCommitCoordinator> {
430 let coordinator = DeltaLakeSinkCommitter {
431 table: self.config.common.create_deltalake_client().await?,
432 app_id: format!("risingwave-deltalake-{}", self.param.sink_id),
433 exactly_once: self.config.is_exactly_once.unwrap_or_default(),
434 };
435 if coordinator.exactly_once {
436 Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
437 } else {
438 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
439 }
440 }
441}
442
443impl TryFrom<SinkParam> for DeltaLakeSink {
444 type Error = SinkError;
445
446 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
447 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
448 DeltaLakeSink::new(config, param)
449 }
450}
451
452pub struct DeltaLakeSinkWriter {
453 pub config: DeltaLakeConfig,
454 #[expect(dead_code)]
455 schema: Schema,
456 #[expect(dead_code)]
457 pk_indices: Vec<usize>,
458 writer: RecordBatchWriter,
459 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
460 #[expect(dead_code)]
461 dl_table: DeltaTable,
462}
463
464impl DeltaLakeSinkWriter {
465 pub async fn new(
466 config: DeltaLakeConfig,
467 schema: Schema,
468 pk_indices: Vec<usize>,
469 ) -> Result<Self> {
470 let dl_table = config.common.create_deltalake_client().await?;
471 let writer = RecordBatchWriter::for_table(&dl_table)?;
472 let dl_schema = writer.arrow_schema();
473
474 Ok(Self {
475 config,
476 schema,
477 pk_indices,
478 writer,
479 dl_schema,
480 dl_table,
481 })
482 }
483
484 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
485 let a = DeltaLakeConvert
486 .to_record_batch(self.dl_schema.clone(), &chunk)
487 .context("convert record batch error")
488 .map_err(SinkError::DeltaLake)?;
489 self.writer.write(a).await?;
490 Ok(())
491 }
492}
493
494#[async_trait]
495impl SinkWriter for DeltaLakeSinkWriter {
496 type CommitMetadata = Option<SinkMetadata>;
497
498 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
499 self.write(chunk).await
500 }
501
502 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
503 Ok(())
504 }
505
506 async fn abort(&mut self) -> Result<()> {
507 Ok(())
508 }
509
510 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
511 if !is_checkpoint {
512 return Ok(None);
513 }
514
515 let adds = self.writer.flush().await?;
516 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
517 adds,
518 })?))
519 }
520}
521
522pub struct DeltaLakeSinkCommitter {
523 table: DeltaTable,
524 app_id: String,
525 exactly_once: bool,
526}
527
528impl DeltaLakeSinkCommitter {
529 fn collect_write_adds(metadata: &[SinkMetadata]) -> Result<Vec<Add>> {
530 Ok(metadata
531 .iter()
532 .map(DeltaLakeWriteResult::try_from)
533 .collect::<Result<Vec<_>>>()?
534 .into_iter()
535 .flat_map(|v| v.adds.into_iter())
536 .collect())
537 }
538
539 fn delta_txn_version(epoch: u64) -> Result<i64> {
540 Ok(i64::try_from(epoch).context("delta lake epoch exceeds i64 range")?)
541 }
542
543 async fn commit_actions(
544 &mut self,
545 epoch: u64,
546 adds: Vec<Add>,
547 txn_identity: Option<&DeltaLakeTxnIdentity>,
548 ) -> Result<()> {
549 if adds.is_empty() {
550 return Ok(());
551 }
552
553 if let Some(txn_identity) = txn_identity
554 && self.is_txn_committed(txn_identity).await?
555 {
556 tracing::info!(
557 "DeltaLake epoch {epoch} already committed for app id {}, txn version {}, skip committing again.",
558 txn_identity.app_id,
559 txn_identity.version
560 );
561 return Ok(());
562 }
563
564 let write_adds = adds.into_iter().map(Action::Add).collect();
565
566 let partition_cols = self
567 .table
568 .snapshot()?
569 .metadata()
570 .partition_columns()
571 .to_vec();
572 let partition_by = if !partition_cols.is_empty() {
573 Some(partition_cols)
574 } else {
575 None
576 };
577 let operation = DeltaOperation::Write {
578 mode: SaveMode::Append,
579 partition_by,
580 predicate: None,
581 };
582 let commit_builder = if let Some(txn_identity) = txn_identity {
583 let commit_builder: CommitBuilder = CommitProperties::default()
584 .with_application_transaction(Transaction::new(
585 &txn_identity.app_id,
586 txn_identity.version,
587 ))
588 .into();
589 commit_builder
590 } else {
591 CommitBuilder::default()
592 };
593 let version = commit_builder
594 .with_actions(write_adds)
595 .build(
596 Some(self.table.snapshot()?),
597 self.table.log_store().clone(),
598 operation,
599 )
600 .await?
601 .version();
602 self.table.update_state().await?;
603 tracing::debug!(
604 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
605 );
606 Ok(())
607 }
608
609 async fn is_txn_committed(&mut self, txn_identity: &DeltaLakeTxnIdentity) -> Result<bool> {
610 self.table.update_state().await?;
611 let log_store = self.table.log_store();
612 let committed_version = self
613 .table
614 .snapshot()?
615 .transaction_version(log_store.as_ref(), &txn_identity.app_id)
616 .await?;
617 Ok(committed_version.is_some_and(|version| version >= txn_identity.version))
618 }
619}
620
621#[async_trait::async_trait]
622impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
623 async fn init(&mut self) -> Result<()> {
624 tracing::info!("DeltaLake commit coordinator inited.");
625 Ok(())
626 }
627
628 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
629 tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
630
631 let adds = Self::collect_write_adds(&metadata)?;
632 self.commit_actions(epoch, adds, None).await
633 }
634}
635
636#[async_trait::async_trait]
637impl TwoPhaseCommitCoordinator for DeltaLakeSinkCommitter {
638 async fn init(&mut self) -> Result<()> {
639 tracing::info!("DeltaLake commit coordinator inited.");
640 Ok(())
641 }
642
643 async fn pre_commit(
644 &mut self,
645 epoch: u64,
646 metadata: Vec<SinkMetadata>,
647 _schema_change: Option<risingwave_pb::stream_plan::PbSinkSchemaChange>,
648 ) -> Result<Option<Vec<u8>>> {
649 tracing::debug!("Starting DeltaLake pre commit in epoch {epoch}.");
650
651 let adds = Self::collect_write_adds(&metadata)?;
652 if adds.is_empty() {
653 return Ok(None);
654 }
655
656 let txn_identity = DeltaLakeTxnIdentity {
657 app_id: self.app_id.clone(),
658 version: Self::delta_txn_version(epoch)?,
659 };
660 Ok(Some(
661 DeltaLakePreCommitMetadata { adds, txn_identity }.try_into_bytes()?,
662 ))
663 }
664
665 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
666 tracing::debug!("Starting DeltaLake exactly-once commit in epoch {epoch}.");
667
668 if commit_metadata.is_empty() {
669 return Ok(());
670 }
671
672 let pre_commit_metadata = DeltaLakePreCommitMetadata::try_from_bytes(&commit_metadata)?;
673 self.commit_actions(
674 epoch,
675 pre_commit_metadata.adds,
676 Some(&pre_commit_metadata.txn_identity),
677 )
678 .await
679 }
680
681 async fn abort(&mut self, epoch: u64, _commit_metadata: Vec<u8>) {
682 tracing::debug!("Abort not implemented yet for DeltaLake epoch {epoch}");
683 }
684}
685
686#[derive(Serialize, Deserialize)]
687struct DeltaLakeWriteResult {
688 adds: Vec<Add>,
689}
690
691#[derive(Serialize, Deserialize)]
692struct DeltaLakePreCommitMetadata {
693 adds: Vec<Add>,
694 txn_identity: DeltaLakeTxnIdentity,
695}
696
697#[derive(Serialize, Deserialize)]
698struct DeltaLakeTxnIdentity {
699 app_id: String,
700 version: i64,
701}
702
703impl DeltaLakePreCommitMetadata {
704 fn try_into_bytes(self) -> Result<Vec<u8>> {
705 Ok(serde_json::to_vec(&self).context("cannot serialize deltalake pre commit metadata")?)
706 }
707
708 fn try_from_bytes(value: &[u8]) -> Result<Self> {
709 Ok(serde_json::from_slice(value)
710 .context("cannot deserialize deltalake pre commit metadata")?)
711 }
712}
713
714impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
715 type Error = SinkError;
716
717 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
718 let metadata =
719 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
720 Ok(SinkMetadata {
721 metadata: Some(Serialized(SerializedMetadata { metadata })),
722 })
723 }
724}
725
726impl DeltaLakeWriteResult {
727 fn try_from(value: &SinkMetadata) -> Result<Self> {
728 if let Some(Serialized(v)) = &value.metadata {
729 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
730 .context("Can't deserialize deltalake sink metadata")?;
731 Ok(DeltaLakeWriteResult { adds })
732 } else {
733 bail!("Can't create deltalake sink write result from empty data!")
734 }
735 }
736}
737
738impl From<::deltalake::DeltaTableError> for SinkError {
739 fn from(value: ::deltalake::DeltaTableError) -> Self {
740 SinkError::DeltaLake(anyhow!(value))
741 }
742}
743
744#[cfg(all(test, not(madsim)))]
745mod tests {
746 use deltalake::kernel::DataType as SchemaDataType;
747 use deltalake::operations::create::CreateBuilder;
748 use maplit::btreemap;
749 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
750 use risingwave_common::catalog::{Field, Schema};
751 use risingwave_common::types::DataType;
752
753 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
754 use crate::sink::writer::SinkWriter;
755 use crate::sink::{SinglePhaseCommitCoordinator, TwoPhaseCommitCoordinator};
756
757 #[tokio::test]
758 async fn test_deltalake() {
759 let dir = tempfile::tempdir().unwrap();
760 let path = dir.path().to_str().unwrap();
761 CreateBuilder::new()
762 .with_location(path)
763 .with_column(
764 "id",
765 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
766 false,
767 Default::default(),
768 )
769 .with_column(
770 "name",
771 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
772 false,
773 Default::default(),
774 )
775 .await
776 .unwrap();
777
778 let properties = btreemap! {
779 "connector".to_owned() => "deltalake".to_owned(),
780 "force_append_only".to_owned() => "true".to_owned(),
781 "type".to_owned() => "append-only".to_owned(),
782 "location".to_owned() => format!("file://{}", path),
783 };
784
785 let schema = Schema::new(vec![
786 Field {
787 data_type: DataType::Int32,
788 name: "id".into(),
789 },
790 Field {
791 data_type: DataType::Varchar,
792 name: "name".into(),
793 },
794 ]);
795
796 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
797 let deltalake_table = deltalake_config
798 .common
799 .create_deltalake_client()
800 .await
801 .unwrap();
802
803 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
804 .await
805 .unwrap();
806 let chunk = StreamChunk::new(
807 vec![Op::Insert, Op::Insert, Op::Insert],
808 vec![
809 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
810 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
811 ],
812 );
813 deltalake_writer.write(chunk).await.unwrap();
814 let mut committer = DeltaLakeSinkCommitter {
815 table: deltalake_table,
816 app_id: "test-single-phase".to_owned(),
817 exactly_once: false,
818 };
819 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
820 SinglePhaseCommitCoordinator::commit_data(&mut committer, 1, vec![metadata])
821 .await
822 .unwrap();
823 let snapshot = committer.table.snapshot().unwrap();
824 assert_eq!(1, snapshot.log_data().num_files());
825 }
826
827 #[tokio::test]
828 async fn test_deltalake_exactly_once() {
829 let dir = tempfile::tempdir().unwrap();
830 let path = dir.path().to_str().unwrap();
831 CreateBuilder::new()
832 .with_location(path)
833 .with_column(
834 "id",
835 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
836 false,
837 Default::default(),
838 )
839 .with_column(
840 "name",
841 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
842 false,
843 Default::default(),
844 )
845 .await
846 .unwrap();
847
848 let properties = btreemap! {
849 "connector".to_owned() => "deltalake".to_owned(),
850 "force_append_only".to_owned() => "true".to_owned(),
851 "type".to_owned() => "append-only".to_owned(),
852 "location".to_owned() => format!("file://{}", path),
853 "is_exactly_once".to_owned() => "true".to_owned(),
854 };
855
856 let schema = Schema::new(vec![
857 Field {
858 data_type: DataType::Int32,
859 name: "id".into(),
860 },
861 Field {
862 data_type: DataType::Varchar,
863 name: "name".into(),
864 },
865 ]);
866
867 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
868 let deltalake_table = deltalake_config
869 .common
870 .create_deltalake_client()
871 .await
872 .unwrap();
873
874 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
875 .await
876 .unwrap();
877 let chunk = StreamChunk::new(
878 vec![Op::Insert, Op::Insert, Op::Insert],
879 vec![
880 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
881 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
882 ],
883 );
884 deltalake_writer.write(chunk).await.unwrap();
885
886 let mut committer = DeltaLakeSinkCommitter {
887 table: deltalake_table,
888 app_id: "test-exactly-once".to_owned(),
889 exactly_once: true,
890 };
891 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
892 let pre_commit_metadata =
893 TwoPhaseCommitCoordinator::pre_commit(&mut committer, 1, vec![metadata], None)
894 .await
895 .unwrap()
896 .unwrap();
897
898 TwoPhaseCommitCoordinator::commit_data(&mut committer, 1, pre_commit_metadata.clone())
899 .await
900 .unwrap();
901 assert_eq!(committer.table.version(), Some(1));
902 assert_eq!(
903 committer.table.snapshot().unwrap().log_data().num_files(),
904 1
905 );
906
907 let log_store = committer.table.log_store();
908 let txn_version = committer
909 .table
910 .snapshot()
911 .unwrap()
912 .transaction_version(log_store.as_ref(), &committer.app_id)
913 .await
914 .unwrap();
915 assert_eq!(txn_version, Some(1));
916
917 TwoPhaseCommitCoordinator::commit_data(&mut committer, 1, pre_commit_metadata)
918 .await
919 .unwrap();
920 assert_eq!(committer.table.version(), Some(1));
921 assert_eq!(
922 committer.table.snapshot().unwrap().log_data().num_files(),
923 1
924 );
925 }
926}