risingwave_connector/sink/
deltalake.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::{CommitBuilder, CommitProperties};
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, Transaction};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tokio::sync::mpsc::UnboundedSender;
43use url::Url;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
53    SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
54    SinkWriterParam, TwoPhaseCommitCoordinator,
55};
56
57pub const DEFAULT_REGION: &str = "us-east-1";
58pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
59
60pub const DELTALAKE_SINK: &str = "deltalake";
61
62#[serde_as]
63#[derive(Deserialize, Debug, Clone, WithOptions)]
64pub struct DeltaLakeCommon {
65    #[serde(rename = "location")]
66    pub location: String,
67    #[serde(flatten)]
68    pub aws_auth_props: AwsAuthProps,
69
70    #[serde(rename = "gcs.service.account")]
71    pub gcs_service_account: Option<String>,
72    /// Commit every n(>0) checkpoints, default is 10.
73    #[serde(default = "default_commit_checkpoint_interval")]
74    #[serde_as(as = "DisplayFromStr")]
75    #[with_option(allow_alter_on_fly)]
76    pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for DeltaLakeCommon {
80    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81        "gcs.service.account",
82    };
83
84    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
85        AwsAuthProps::enforce_one(prop)?;
86        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
87            return Err(EnforceSecretError {
88                key: prop.to_owned(),
89            }
90            .into());
91        }
92
93        Ok(())
94    }
95}
96
97impl DeltaLakeCommon {
98    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
99        let table = match Self::get_table_url(&self.location)? {
100            DeltaTableUrl::S3(s3_path) => {
101                let storage_options = self.build_delta_lake_config_for_aws().await?;
102                deltalake::aws::register_handlers(None);
103                let url = Url::parse(&s3_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
104                deltalake::open_table_with_storage_options(url, storage_options).await?
105            }
106            DeltaTableUrl::Local(local_path) => {
107                let url = Url::parse(&format!("file://{}", local_path))
108                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
109                deltalake::open_table(url).await?
110            }
111            DeltaTableUrl::Gcs(gcs_path) => {
112                let mut storage_options = HashMap::new();
113                storage_options.insert(
114                    GCS_SERVICE_ACCOUNT.to_owned(),
115                    self.gcs_service_account.clone().ok_or_else(|| {
116                        SinkError::Config(anyhow!(
117                            "gcs.service.account is required with Google Cloud Storage (GCS)"
118                        ))
119                    })?,
120                );
121                deltalake::gcp::register_handlers(None);
122                let url = Url::parse(&gcs_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
123                deltalake::open_table_with_storage_options(url, storage_options).await?
124            }
125        };
126        Ok(table)
127    }
128
129    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
130        if path.starts_with("s3://") || path.starts_with("s3a://") {
131            Ok(DeltaTableUrl::S3(path.to_owned()))
132        } else if path.starts_with("gs://") {
133            Ok(DeltaTableUrl::Gcs(path.to_owned()))
134        } else if let Some(path) = path.strip_prefix("file://") {
135            Ok(DeltaTableUrl::Local(path.to_owned()))
136        } else {
137            Err(SinkError::DeltaLake(anyhow!(
138                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
139            )))
140        }
141    }
142
143    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
144        let mut storage_options = HashMap::new();
145        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
146        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
147        let sdk_config = self.aws_auth_props.build_config().await?;
148        let credentials = sdk_config
149            .credentials_provider()
150            .ok_or_else(|| {
151                SinkError::Config(anyhow!(
152                    "s3.access.key and s3.secret.key is required with aws s3"
153                ))
154            })?
155            .as_ref()
156            .provide_credentials()
157            .await
158            .map_err(|e| SinkError::Config(e.into()))?;
159        let region = sdk_config.region();
160        let endpoint = sdk_config.endpoint_url();
161        storage_options.insert(
162            AWS_ACCESS_KEY_ID.to_owned(),
163            credentials.access_key_id().to_owned(),
164        );
165        storage_options.insert(
166            AWS_SECRET_ACCESS_KEY.to_owned(),
167            credentials.secret_access_key().to_owned(),
168        );
169        if endpoint.is_none() && region.is_none() {
170            return Err(SinkError::Config(anyhow!(
171                "s3.endpoint and s3.region need to be filled with at least one"
172            )));
173        }
174        storage_options.insert(
175            AWS_REGION.to_owned(),
176            region
177                .map(|r| r.as_ref().to_owned())
178                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
179        );
180        if let Some(s3_endpoint) = endpoint {
181            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
182        }
183        Ok(storage_options)
184    }
185}
186
187enum DeltaTableUrl {
188    S3(String),
189    Local(String),
190    Gcs(String),
191}
192
193#[serde_as]
194#[derive(Clone, Debug, Deserialize, WithOptions)]
195pub struct DeltaLakeConfig {
196    #[serde(flatten)]
197    pub common: DeltaLakeCommon,
198
199    pub r#type: String,
200
201    /// Whether to use the Delta transaction log to deduplicate replayed epoch commits.
202    #[serde_as(as = "Option<DisplayFromStr>")]
203    pub is_exactly_once: Option<bool>,
204}
205
206impl EnforceSecret for DeltaLakeConfig {
207    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
208        DeltaLakeCommon::enforce_one(prop)
209    }
210}
211
212impl DeltaLakeConfig {
213    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
214        let config = serde_json::from_value::<DeltaLakeConfig>(
215            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
216        )
217        .map_err(|e| SinkError::Config(anyhow!(e)))?;
218        Ok(config)
219    }
220}
221
222#[derive(Debug)]
223pub struct DeltaLakeSink {
224    pub config: DeltaLakeConfig,
225    param: SinkParam,
226}
227
228impl EnforceSecret for DeltaLakeSink {
229    fn enforce_secret<'a>(
230        prop_iter: impl Iterator<Item = &'a str>,
231    ) -> crate::error::ConnectorResult<()> {
232        for prop in prop_iter {
233            DeltaLakeCommon::enforce_one(prop)?;
234        }
235        Ok(())
236    }
237}
238
239impl DeltaLakeSink {
240    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
241        Ok(Self { config, param })
242    }
243}
244
245fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
246    let result = match rw_data_type {
247        DataType::Boolean => {
248            matches!(
249                dl_data_type,
250                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
251            )
252        }
253        DataType::Int16 => {
254            matches!(
255                dl_data_type,
256                DeltaLakeDataType::Primitive(PrimitiveType::Short)
257            )
258        }
259        DataType::Int32 => {
260            matches!(
261                dl_data_type,
262                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
263            )
264        }
265        DataType::Int64 => {
266            matches!(
267                dl_data_type,
268                DeltaLakeDataType::Primitive(PrimitiveType::Long)
269            )
270        }
271        DataType::Float32 => {
272            matches!(
273                dl_data_type,
274                DeltaLakeDataType::Primitive(PrimitiveType::Float)
275            )
276        }
277        DataType::Float64 => {
278            matches!(
279                dl_data_type,
280                DeltaLakeDataType::Primitive(PrimitiveType::Double)
281            )
282        }
283        DataType::Decimal => {
284            matches!(
285                dl_data_type,
286                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
287            )
288        }
289        DataType::Date => {
290            matches!(
291                dl_data_type,
292                DeltaLakeDataType::Primitive(PrimitiveType::Date)
293            )
294        }
295        DataType::Varchar => {
296            matches!(
297                dl_data_type,
298                DeltaLakeDataType::Primitive(PrimitiveType::String)
299            )
300        }
301        DataType::Timestamptz => {
302            matches!(
303                dl_data_type,
304                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
305            )
306        }
307        DataType::Struct(rw_struct) => {
308            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
309                let mut result = true;
310                for ((rw_name, rw_type), dl_field) in
311                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
312                {
313                    result = check_field_type(rw_type, dl_field.data_type())?
314                        && result
315                        && rw_name.eq(dl_field.name());
316                }
317                result
318            } else {
319                false
320            }
321        }
322        DataType::List(rw_list) => {
323            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
324                check_field_type(rw_list.elem(), dl_list.element_type())?
325            } else {
326                false
327            }
328        }
329        _ => {
330            return Err(SinkError::DeltaLake(anyhow!(
331                "Type {:?} is not supported for DeltaLake sink.",
332                rw_data_type.to_owned()
333            )));
334        }
335    };
336    Ok(result)
337}
338
339impl Sink for DeltaLakeSink {
340    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
341
342    const SINK_NAME: &'static str = DELTALAKE_SINK;
343
344    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
345        let inner = DeltaLakeSinkWriter::new(
346            self.config.clone(),
347            self.param.schema().clone(),
348            self.param.downstream_pk_or_empty(),
349        )
350        .await?;
351
352        let commit_checkpoint_interval =
353            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
354                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
355            );
356
357        let writer = CoordinatedLogSinker::new(
358            &writer_param,
359            self.param.clone(),
360            inner,
361            commit_checkpoint_interval,
362        )
363        .await?;
364
365        Ok(writer)
366    }
367
368    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
369        DeltaLakeConfig::from_btreemap(config.clone())?;
370        Ok(())
371    }
372
373    async fn validate(&self) -> Result<()> {
374        if self.config.r#type != SINK_TYPE_APPEND_ONLY
375            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
376        {
377            return Err(SinkError::Config(anyhow!(
378                "only append-only delta lake sink is supported",
379            )));
380        }
381        let table = self.config.common.create_deltalake_client().await?;
382        let snapshot = table.snapshot()?;
383        let delta_schema = snapshot.schema();
384        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
385            .fields()
386            .map(|f| (f.name(), f.data_type()))
387            .collect();
388        if deltalake_fields.len() != self.param.schema().fields().len() {
389            return Err(SinkError::DeltaLake(anyhow!(
390                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
391                self.param.schema().fields().len(),
392                deltalake_fields.len()
393            )));
394        }
395        for field in self.param.schema().fields() {
396            if !deltalake_fields.contains_key(&field.name) {
397                return Err(SinkError::DeltaLake(anyhow!(
398                    "column {} not found in deltalake table",
399                    field.name
400                )));
401            }
402            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
403                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
404            })?;
405            if !check_field_type(&field.data_type, deltalake_field_type)? {
406                return Err(SinkError::DeltaLake(anyhow!(
407                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
408                    field.name,
409                    deltalake_field_type,
410                    field.data_type
411                )));
412            }
413        }
414        if self.config.common.commit_checkpoint_interval == 0 {
415            return Err(SinkError::Config(anyhow!(
416                "`commit_checkpoint_interval` must be greater than 0"
417            )));
418        }
419        Ok(())
420    }
421
422    fn is_coordinated_sink(&self) -> bool {
423        true
424    }
425
426    async fn new_coordinator(
427        &self,
428        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
429    ) -> Result<SinkCommitCoordinator> {
430        let coordinator = DeltaLakeSinkCommitter {
431            table: self.config.common.create_deltalake_client().await?,
432            app_id: format!("risingwave-deltalake-{}", self.param.sink_id),
433            exactly_once: self.config.is_exactly_once.unwrap_or_default(),
434        };
435        if coordinator.exactly_once {
436            Ok(SinkCommitCoordinator::TwoPhase(Box::new(coordinator)))
437        } else {
438            Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
439        }
440    }
441}
442
443impl TryFrom<SinkParam> for DeltaLakeSink {
444    type Error = SinkError;
445
446    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
447        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
448        DeltaLakeSink::new(config, param)
449    }
450}
451
452pub struct DeltaLakeSinkWriter {
453    pub config: DeltaLakeConfig,
454    #[expect(dead_code)]
455    schema: Schema,
456    #[expect(dead_code)]
457    pk_indices: Vec<usize>,
458    writer: RecordBatchWriter,
459    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
460    #[expect(dead_code)]
461    dl_table: DeltaTable,
462}
463
464impl DeltaLakeSinkWriter {
465    pub async fn new(
466        config: DeltaLakeConfig,
467        schema: Schema,
468        pk_indices: Vec<usize>,
469    ) -> Result<Self> {
470        let dl_table = config.common.create_deltalake_client().await?;
471        let writer = RecordBatchWriter::for_table(&dl_table)?;
472        let dl_schema = writer.arrow_schema();
473
474        Ok(Self {
475            config,
476            schema,
477            pk_indices,
478            writer,
479            dl_schema,
480            dl_table,
481        })
482    }
483
484    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
485        let a = DeltaLakeConvert
486            .to_record_batch(self.dl_schema.clone(), &chunk)
487            .context("convert record batch error")
488            .map_err(SinkError::DeltaLake)?;
489        self.writer.write(a).await?;
490        Ok(())
491    }
492}
493
494#[async_trait]
495impl SinkWriter for DeltaLakeSinkWriter {
496    type CommitMetadata = Option<SinkMetadata>;
497
498    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
499        self.write(chunk).await
500    }
501
502    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
503        Ok(())
504    }
505
506    async fn abort(&mut self) -> Result<()> {
507        Ok(())
508    }
509
510    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
511        if !is_checkpoint {
512            return Ok(None);
513        }
514
515        let adds = self.writer.flush().await?;
516        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
517            adds,
518        })?))
519    }
520}
521
522pub struct DeltaLakeSinkCommitter {
523    table: DeltaTable,
524    app_id: String,
525    exactly_once: bool,
526}
527
528impl DeltaLakeSinkCommitter {
529    fn collect_write_adds(metadata: &[SinkMetadata]) -> Result<Vec<Add>> {
530        Ok(metadata
531            .iter()
532            .map(DeltaLakeWriteResult::try_from)
533            .collect::<Result<Vec<_>>>()?
534            .into_iter()
535            .flat_map(|v| v.adds.into_iter())
536            .collect())
537    }
538
539    fn delta_txn_version(epoch: u64) -> Result<i64> {
540        Ok(i64::try_from(epoch).context("delta lake epoch exceeds i64 range")?)
541    }
542
543    async fn commit_actions(
544        &mut self,
545        epoch: u64,
546        adds: Vec<Add>,
547        txn_identity: Option<&DeltaLakeTxnIdentity>,
548    ) -> Result<()> {
549        if adds.is_empty() {
550            return Ok(());
551        }
552
553        if let Some(txn_identity) = txn_identity
554            && self.is_txn_committed(txn_identity).await?
555        {
556            tracing::info!(
557                "DeltaLake epoch {epoch} already committed for app id {}, txn version {}, skip committing again.",
558                txn_identity.app_id,
559                txn_identity.version
560            );
561            return Ok(());
562        }
563
564        let write_adds = adds.into_iter().map(Action::Add).collect();
565
566        let partition_cols = self
567            .table
568            .snapshot()?
569            .metadata()
570            .partition_columns()
571            .to_vec();
572        let partition_by = if !partition_cols.is_empty() {
573            Some(partition_cols)
574        } else {
575            None
576        };
577        let operation = DeltaOperation::Write {
578            mode: SaveMode::Append,
579            partition_by,
580            predicate: None,
581        };
582        let commit_builder = if let Some(txn_identity) = txn_identity {
583            let commit_builder: CommitBuilder = CommitProperties::default()
584                .with_application_transaction(Transaction::new(
585                    &txn_identity.app_id,
586                    txn_identity.version,
587                ))
588                .into();
589            commit_builder
590        } else {
591            CommitBuilder::default()
592        };
593        let version = commit_builder
594            .with_actions(write_adds)
595            .build(
596                Some(self.table.snapshot()?),
597                self.table.log_store().clone(),
598                operation,
599            )
600            .await?
601            .version();
602        self.table.update_state().await?;
603        tracing::debug!(
604            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
605        );
606        Ok(())
607    }
608
609    async fn is_txn_committed(&mut self, txn_identity: &DeltaLakeTxnIdentity) -> Result<bool> {
610        self.table.update_state().await?;
611        let log_store = self.table.log_store();
612        let committed_version = self
613            .table
614            .snapshot()?
615            .transaction_version(log_store.as_ref(), &txn_identity.app_id)
616            .await?;
617        Ok(committed_version.is_some_and(|version| version >= txn_identity.version))
618    }
619}
620
621#[async_trait::async_trait]
622impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
623    async fn init(&mut self) -> Result<()> {
624        tracing::info!("DeltaLake commit coordinator inited.");
625        Ok(())
626    }
627
628    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
629        tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
630
631        let adds = Self::collect_write_adds(&metadata)?;
632        self.commit_actions(epoch, adds, None).await
633    }
634}
635
636#[async_trait::async_trait]
637impl TwoPhaseCommitCoordinator for DeltaLakeSinkCommitter {
638    async fn init(&mut self) -> Result<()> {
639        tracing::info!("DeltaLake commit coordinator inited.");
640        Ok(())
641    }
642
643    async fn pre_commit(
644        &mut self,
645        epoch: u64,
646        metadata: Vec<SinkMetadata>,
647        _schema_change: Option<risingwave_pb::stream_plan::PbSinkSchemaChange>,
648    ) -> Result<Option<Vec<u8>>> {
649        tracing::debug!("Starting DeltaLake pre commit in epoch {epoch}.");
650
651        let adds = Self::collect_write_adds(&metadata)?;
652        if adds.is_empty() {
653            return Ok(None);
654        }
655
656        let txn_identity = DeltaLakeTxnIdentity {
657            app_id: self.app_id.clone(),
658            version: Self::delta_txn_version(epoch)?,
659        };
660        Ok(Some(
661            DeltaLakePreCommitMetadata { adds, txn_identity }.try_into_bytes()?,
662        ))
663    }
664
665    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
666        tracing::debug!("Starting DeltaLake exactly-once commit in epoch {epoch}.");
667
668        if commit_metadata.is_empty() {
669            return Ok(());
670        }
671
672        let pre_commit_metadata = DeltaLakePreCommitMetadata::try_from_bytes(&commit_metadata)?;
673        self.commit_actions(
674            epoch,
675            pre_commit_metadata.adds,
676            Some(&pre_commit_metadata.txn_identity),
677        )
678        .await
679    }
680
681    async fn abort(&mut self, epoch: u64, _commit_metadata: Vec<u8>) {
682        tracing::debug!("Abort not implemented yet for DeltaLake epoch {epoch}");
683    }
684}
685
686#[derive(Serialize, Deserialize)]
687struct DeltaLakeWriteResult {
688    adds: Vec<Add>,
689}
690
691#[derive(Serialize, Deserialize)]
692struct DeltaLakePreCommitMetadata {
693    adds: Vec<Add>,
694    txn_identity: DeltaLakeTxnIdentity,
695}
696
697#[derive(Serialize, Deserialize)]
698struct DeltaLakeTxnIdentity {
699    app_id: String,
700    version: i64,
701}
702
703impl DeltaLakePreCommitMetadata {
704    fn try_into_bytes(self) -> Result<Vec<u8>> {
705        Ok(serde_json::to_vec(&self).context("cannot serialize deltalake pre commit metadata")?)
706    }
707
708    fn try_from_bytes(value: &[u8]) -> Result<Self> {
709        Ok(serde_json::from_slice(value)
710            .context("cannot deserialize deltalake pre commit metadata")?)
711    }
712}
713
714impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
715    type Error = SinkError;
716
717    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
718        let metadata =
719            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
720        Ok(SinkMetadata {
721            metadata: Some(Serialized(SerializedMetadata { metadata })),
722        })
723    }
724}
725
726impl DeltaLakeWriteResult {
727    fn try_from(value: &SinkMetadata) -> Result<Self> {
728        if let Some(Serialized(v)) = &value.metadata {
729            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
730                .context("Can't deserialize deltalake sink metadata")?;
731            Ok(DeltaLakeWriteResult { adds })
732        } else {
733            bail!("Can't create deltalake sink write result from empty data!")
734        }
735    }
736}
737
738impl From<::deltalake::DeltaTableError> for SinkError {
739    fn from(value: ::deltalake::DeltaTableError) -> Self {
740        SinkError::DeltaLake(anyhow!(value))
741    }
742}
743
744#[cfg(all(test, not(madsim)))]
745mod tests {
746    use deltalake::kernel::DataType as SchemaDataType;
747    use deltalake::operations::create::CreateBuilder;
748    use maplit::btreemap;
749    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
750    use risingwave_common::catalog::{Field, Schema};
751    use risingwave_common::types::DataType;
752
753    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
754    use crate::sink::writer::SinkWriter;
755    use crate::sink::{SinglePhaseCommitCoordinator, TwoPhaseCommitCoordinator};
756
757    #[tokio::test]
758    async fn test_deltalake() {
759        let dir = tempfile::tempdir().unwrap();
760        let path = dir.path().to_str().unwrap();
761        CreateBuilder::new()
762            .with_location(path)
763            .with_column(
764                "id",
765                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
766                false,
767                Default::default(),
768            )
769            .with_column(
770                "name",
771                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
772                false,
773                Default::default(),
774            )
775            .await
776            .unwrap();
777
778        let properties = btreemap! {
779            "connector".to_owned() => "deltalake".to_owned(),
780            "force_append_only".to_owned() => "true".to_owned(),
781            "type".to_owned() => "append-only".to_owned(),
782            "location".to_owned() => format!("file://{}", path),
783        };
784
785        let schema = Schema::new(vec![
786            Field {
787                data_type: DataType::Int32,
788                name: "id".into(),
789            },
790            Field {
791                data_type: DataType::Varchar,
792                name: "name".into(),
793            },
794        ]);
795
796        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
797        let deltalake_table = deltalake_config
798            .common
799            .create_deltalake_client()
800            .await
801            .unwrap();
802
803        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
804            .await
805            .unwrap();
806        let chunk = StreamChunk::new(
807            vec![Op::Insert, Op::Insert, Op::Insert],
808            vec![
809                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
810                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
811            ],
812        );
813        deltalake_writer.write(chunk).await.unwrap();
814        let mut committer = DeltaLakeSinkCommitter {
815            table: deltalake_table,
816            app_id: "test-single-phase".to_owned(),
817            exactly_once: false,
818        };
819        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
820        SinglePhaseCommitCoordinator::commit_data(&mut committer, 1, vec![metadata])
821            .await
822            .unwrap();
823        let snapshot = committer.table.snapshot().unwrap();
824        assert_eq!(1, snapshot.log_data().num_files());
825    }
826
827    #[tokio::test]
828    async fn test_deltalake_exactly_once() {
829        let dir = tempfile::tempdir().unwrap();
830        let path = dir.path().to_str().unwrap();
831        CreateBuilder::new()
832            .with_location(path)
833            .with_column(
834                "id",
835                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
836                false,
837                Default::default(),
838            )
839            .with_column(
840                "name",
841                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
842                false,
843                Default::default(),
844            )
845            .await
846            .unwrap();
847
848        let properties = btreemap! {
849            "connector".to_owned() => "deltalake".to_owned(),
850            "force_append_only".to_owned() => "true".to_owned(),
851            "type".to_owned() => "append-only".to_owned(),
852            "location".to_owned() => format!("file://{}", path),
853            "is_exactly_once".to_owned() => "true".to_owned(),
854        };
855
856        let schema = Schema::new(vec![
857            Field {
858                data_type: DataType::Int32,
859                name: "id".into(),
860            },
861            Field {
862                data_type: DataType::Varchar,
863                name: "name".into(),
864            },
865        ]);
866
867        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
868        let deltalake_table = deltalake_config
869            .common
870            .create_deltalake_client()
871            .await
872            .unwrap();
873
874        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
875            .await
876            .unwrap();
877        let chunk = StreamChunk::new(
878            vec![Op::Insert, Op::Insert, Op::Insert],
879            vec![
880                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
881                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
882            ],
883        );
884        deltalake_writer.write(chunk).await.unwrap();
885
886        let mut committer = DeltaLakeSinkCommitter {
887            table: deltalake_table,
888            app_id: "test-exactly-once".to_owned(),
889            exactly_once: true,
890        };
891        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
892        let pre_commit_metadata =
893            TwoPhaseCommitCoordinator::pre_commit(&mut committer, 1, vec![metadata], None)
894                .await
895                .unwrap()
896                .unwrap();
897
898        TwoPhaseCommitCoordinator::commit_data(&mut committer, 1, pre_commit_metadata.clone())
899            .await
900            .unwrap();
901        assert_eq!(committer.table.version(), Some(1));
902        assert_eq!(
903            committer.table.snapshot().unwrap().log_data().num_files(),
904            1
905        );
906
907        let log_store = committer.table.log_store();
908        let txn_version = committer
909            .table
910            .snapshot()
911            .unwrap()
912            .transaction_version(log_store.as_ref(), &committer.app_id)
913            .await
914            .unwrap();
915        assert_eq!(txn_version, Some(1));
916
917        TwoPhaseCommitCoordinator::commit_data(&mut committer, 1, pre_commit_metadata)
918            .await
919            .unwrap();
920        assert_eq!(committer.table.version(), Some(1));
921        assert_eq!(
922            committer.table.snapshot().unwrap().log_data().num_files(),
923            1
924        );
925    }
926}