risingwave_connector/sink/
deltalake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
27use deltalake::operations::transaction::CommitBuilder;
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use risingwave_common::array::StreamChunk;
31use risingwave_common::array::arrow::DeltaLakeConvert;
32use risingwave_common::bail;
33use risingwave_common::catalog::Schema;
34use risingwave_common::types::DataType;
35use risingwave_common::util::iter_util::ZipEqDebug;
36use risingwave_pb::connector_service::SinkMetadata;
37use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
38use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
39use sea_orm::DatabaseConnection;
40use serde_derive::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use with_options::WithOptions;
43
44use super::coordinate::CoordinatedLogSinker;
45use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
46use super::writer::SinkWriter;
47use super::{
48    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
49    SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
50};
51use crate::connector_common::AwsAuthProps;
52
53pub const DELTALAKE_SINK: &str = "deltalake";
54pub const DEFAULT_REGION: &str = "us-east-1";
55pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
56
57#[serde_as]
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct DeltaLakeCommon {
60    #[serde(rename = "location")]
61    pub location: String,
62    #[serde(flatten)]
63    pub aws_auth_props: AwsAuthProps,
64
65    #[serde(rename = "gcs.service.account")]
66    pub gcs_service_account: Option<String>,
67    /// Commit every n(>0) checkpoints, default is 10.
68    #[serde(default = "default_commit_checkpoint_interval")]
69    #[serde_as(as = "DisplayFromStr")]
70    pub commit_checkpoint_interval: u64,
71}
72
73impl DeltaLakeCommon {
74    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
75        let table = match Self::get_table_url(&self.location)? {
76            DeltaTableUrl::S3(s3_path) => {
77                let storage_options = self.build_delta_lake_config_for_aws().await?;
78                deltalake::aws::register_handlers(None);
79                deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
80            }
81            DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
82            DeltaTableUrl::Gcs(gcs_path) => {
83                let mut storage_options = HashMap::new();
84                storage_options.insert(
85                    GCS_SERVICE_ACCOUNT.to_owned(),
86                    self.gcs_service_account.clone().ok_or_else(|| {
87                        SinkError::Config(anyhow!(
88                            "gcs.service.account is required with Google Cloud Storage (GCS)"
89                        ))
90                    })?,
91                );
92                deltalake::gcp::register_handlers(None);
93                deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
94                    .await?
95            }
96        };
97        Ok(table)
98    }
99
100    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
101        if path.starts_with("s3://") || path.starts_with("s3a://") {
102            Ok(DeltaTableUrl::S3(path.to_owned()))
103        } else if path.starts_with("gs://") {
104            Ok(DeltaTableUrl::Gcs(path.to_owned()))
105        } else if let Some(path) = path.strip_prefix("file://") {
106            Ok(DeltaTableUrl::Local(path.to_owned()))
107        } else {
108            Err(SinkError::DeltaLake(anyhow!(
109                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
110            )))
111        }
112    }
113
114    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
115        let mut storage_options = HashMap::new();
116        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
117        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
118        let sdk_config = self.aws_auth_props.build_config().await?;
119        let credentials = sdk_config
120            .credentials_provider()
121            .ok_or_else(|| {
122                SinkError::Config(anyhow!(
123                    "s3.access.key and s3.secret.key is required with aws s3"
124                ))
125            })?
126            .as_ref()
127            .provide_credentials()
128            .await
129            .map_err(|e| SinkError::Config(e.into()))?;
130        let region = sdk_config.region();
131        let endpoint = sdk_config.endpoint_url();
132        storage_options.insert(
133            AWS_ACCESS_KEY_ID.to_owned(),
134            credentials.access_key_id().to_owned(),
135        );
136        storage_options.insert(
137            AWS_SECRET_ACCESS_KEY.to_owned(),
138            credentials.secret_access_key().to_owned(),
139        );
140        if endpoint.is_none() && region.is_none() {
141            return Err(SinkError::Config(anyhow!(
142                "s3.endpoint and s3.region need to be filled with at least one"
143            )));
144        }
145        storage_options.insert(
146            AWS_REGION.to_owned(),
147            region
148                .map(|r| r.as_ref().to_owned())
149                .clone()
150                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
151        );
152        if let Some(s3_endpoint) = endpoint {
153            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
154        }
155        Ok(storage_options)
156    }
157}
158
159enum DeltaTableUrl {
160    S3(String),
161    Local(String),
162    Gcs(String),
163}
164
165#[serde_as]
166#[derive(Clone, Debug, Deserialize, WithOptions)]
167pub struct DeltaLakeConfig {
168    #[serde(flatten)]
169    pub common: DeltaLakeCommon,
170
171    pub r#type: String,
172}
173
174impl DeltaLakeConfig {
175    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
176        let config = serde_json::from_value::<DeltaLakeConfig>(
177            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
178        )
179        .map_err(|e| SinkError::Config(anyhow!(e)))?;
180        Ok(config)
181    }
182}
183
184#[derive(Debug)]
185pub struct DeltaLakeSink {
186    pub config: DeltaLakeConfig,
187    param: SinkParam,
188}
189
190impl DeltaLakeSink {
191    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
192        Ok(Self { config, param })
193    }
194}
195
196fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
197    let result = match rw_data_type {
198        DataType::Boolean => {
199            matches!(
200                dl_data_type,
201                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
202            )
203        }
204        DataType::Int16 => {
205            matches!(
206                dl_data_type,
207                DeltaLakeDataType::Primitive(PrimitiveType::Short)
208            )
209        }
210        DataType::Int32 => {
211            matches!(
212                dl_data_type,
213                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
214            )
215        }
216        DataType::Int64 => {
217            matches!(
218                dl_data_type,
219                DeltaLakeDataType::Primitive(PrimitiveType::Long)
220            )
221        }
222        DataType::Float32 => {
223            matches!(
224                dl_data_type,
225                DeltaLakeDataType::Primitive(PrimitiveType::Float)
226            )
227        }
228        DataType::Float64 => {
229            matches!(
230                dl_data_type,
231                DeltaLakeDataType::Primitive(PrimitiveType::Double)
232            )
233        }
234        DataType::Decimal => {
235            matches!(
236                dl_data_type,
237                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_, _))
238            )
239        }
240        DataType::Date => {
241            matches!(
242                dl_data_type,
243                DeltaLakeDataType::Primitive(PrimitiveType::Date)
244            )
245        }
246        DataType::Varchar => {
247            matches!(
248                dl_data_type,
249                DeltaLakeDataType::Primitive(PrimitiveType::String)
250            )
251        }
252        DataType::Timestamptz => {
253            matches!(
254                dl_data_type,
255                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
256            )
257        }
258        DataType::Struct(rw_struct) => {
259            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
260                let mut result = true;
261                for ((rw_name, rw_type), dl_field) in
262                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
263                {
264                    result = check_field_type(rw_type, dl_field.data_type())?
265                        && result
266                        && rw_name.eq(dl_field.name());
267                }
268                result
269            } else {
270                false
271            }
272        }
273        DataType::List(rw_list) => {
274            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
275                check_field_type(rw_list, dl_list.element_type())?
276            } else {
277                false
278            }
279        }
280        _ => {
281            return Err(SinkError::DeltaLake(anyhow!(
282                "deltalake cannot support type {:?}",
283                rw_data_type.to_owned()
284            )));
285        }
286    };
287    Ok(result)
288}
289
290impl Sink for DeltaLakeSink {
291    type Coordinator = DeltaLakeSinkCommitter;
292    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
293
294    const SINK_NAME: &'static str = DELTALAKE_SINK;
295
296    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
297        let inner = DeltaLakeSinkWriter::new(
298            self.config.clone(),
299            self.param.schema().clone(),
300            self.param.downstream_pk.clone(),
301        )
302        .await?;
303
304        let commit_checkpoint_interval =
305            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
306                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
307            );
308
309        let writer = CoordinatedLogSinker::new(
310            &writer_param,
311            self.param.clone(),
312            inner,
313            commit_checkpoint_interval,
314        )
315        .await?;
316
317        Ok(writer)
318    }
319
320    async fn validate(&self) -> Result<()> {
321        if self.config.r#type != SINK_TYPE_APPEND_ONLY
322            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
323        {
324            return Err(SinkError::Config(anyhow!(
325                "only append-only delta lake sink is supported",
326            )));
327        }
328        let table = self.config.common.create_deltalake_client().await?;
329        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
330            .get_schema()?
331            .fields()
332            .map(|f| (f.name(), f.data_type()))
333            .collect();
334        if deltalake_fields.len() != self.param.schema().fields().len() {
335            return Err(SinkError::DeltaLake(anyhow!(
336                "Columns mismatch. RisingWave is {}, DeltaLake is {}",
337                self.param.schema().fields().len(),
338                deltalake_fields.len()
339            )));
340        }
341        for field in self.param.schema().fields() {
342            if !deltalake_fields.contains_key(&field.name) {
343                return Err(SinkError::DeltaLake(anyhow!(
344                    "column {} not found in deltalake table",
345                    field.name
346                )));
347            }
348            let deltalake_field_type: &&DeltaLakeDataType = deltalake_fields
349                .get(&field.name)
350                .ok_or_else(|| SinkError::DeltaLake(anyhow!("cannot find field type")))?;
351            if !check_field_type(&field.data_type, deltalake_field_type)? {
352                return Err(SinkError::DeltaLake(anyhow!(
353                    "column {} type is not match, deltalake is {:?}, rw is{:?}",
354                    field.name,
355                    deltalake_field_type,
356                    field.data_type
357                )));
358            }
359        }
360        if self.config.common.commit_checkpoint_interval == 0 {
361            return Err(SinkError::Config(anyhow!(
362                "`commit_checkpoint_interval` must be greater than 0"
363            )));
364        }
365        Ok(())
366    }
367
368    fn is_coordinated_sink(&self) -> bool {
369        true
370    }
371
372    async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
373        Ok(DeltaLakeSinkCommitter {
374            table: self.config.common.create_deltalake_client().await?,
375        })
376    }
377}
378
379impl TryFrom<SinkParam> for DeltaLakeSink {
380    type Error = SinkError;
381
382    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
383        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
384        DeltaLakeSink::new(config, param)
385    }
386}
387
388pub struct DeltaLakeSinkWriter {
389    pub config: DeltaLakeConfig,
390    #[expect(dead_code)]
391    schema: Schema,
392    #[expect(dead_code)]
393    pk_indices: Vec<usize>,
394    writer: RecordBatchWriter,
395    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
396    #[expect(dead_code)]
397    dl_table: DeltaTable,
398}
399
400impl DeltaLakeSinkWriter {
401    pub async fn new(
402        config: DeltaLakeConfig,
403        schema: Schema,
404        pk_indices: Vec<usize>,
405    ) -> Result<Self> {
406        let dl_table = config.common.create_deltalake_client().await?;
407        let writer = RecordBatchWriter::for_table(&dl_table)?;
408        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
409            Arc::new(convert_schema(dl_table.get_schema()?)?);
410
411        Ok(Self {
412            config,
413            schema,
414            pk_indices,
415            writer,
416            dl_schema,
417            dl_table,
418        })
419    }
420
421    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
422        let a = DeltaLakeConvert
423            .to_record_batch(self.dl_schema.clone(), &chunk)
424            .context("convert record batch error")
425            .map_err(SinkError::DeltaLake)?;
426        self.writer.write(a).await?;
427        Ok(())
428    }
429}
430
431fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
432    let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
433    for field in schema.fields() {
434        let dl_field = deltalake::arrow::datatypes::Field::new(
435            field.name(),
436            deltalake::arrow::datatypes::DataType::try_from(field.data_type())
437                .context("convert schema error")
438                .map_err(SinkError::DeltaLake)?,
439            field.is_nullable(),
440        );
441        builder.push(dl_field);
442    }
443    Ok(builder.finish())
444}
445
446#[async_trait]
447impl SinkWriter for DeltaLakeSinkWriter {
448    type CommitMetadata = Option<SinkMetadata>;
449
450    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
451        self.write(chunk).await
452    }
453
454    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
455        Ok(())
456    }
457
458    async fn abort(&mut self) -> Result<()> {
459        Ok(())
460    }
461
462    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
463        if !is_checkpoint {
464            return Ok(None);
465        }
466
467        let adds = self.writer.flush().await?;
468        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
469            adds,
470        })?))
471    }
472}
473
474pub struct DeltaLakeSinkCommitter {
475    table: DeltaTable,
476}
477
478#[async_trait::async_trait]
479impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
480    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
481        tracing::info!("DeltaLake commit coordinator inited.");
482        Ok(None)
483    }
484
485    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
486        tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
487
488        let deltalake_write_result = metadata
489            .iter()
490            .map(DeltaLakeWriteResult::try_from)
491            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
492        let write_adds: Vec<Action> = deltalake_write_result
493            .into_iter()
494            .flat_map(|v| v.adds.into_iter())
495            .map(Action::Add)
496            .collect();
497
498        if write_adds.is_empty() {
499            return Ok(());
500        }
501        let partition_cols = self.table.metadata()?.partition_columns.clone();
502        let partition_by = if !partition_cols.is_empty() {
503            Some(partition_cols)
504        } else {
505            None
506        };
507        let operation = DeltaOperation::Write {
508            mode: SaveMode::Append,
509            partition_by,
510            predicate: None,
511        };
512        let version = CommitBuilder::default()
513            .with_actions(write_adds)
514            .build(
515                Some(self.table.snapshot()?),
516                self.table.log_store().clone(),
517                operation,
518            )
519            .await?
520            .version();
521        self.table.update().await?;
522        tracing::info!(
523            "Succeeded to commit ti DeltaLake table in epoch {epoch} version {version}."
524        );
525        Ok(())
526    }
527}
528
529#[derive(Serialize, Deserialize)]
530struct DeltaLakeWriteResult {
531    adds: Vec<Add>,
532}
533
534impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
535    type Error = SinkError;
536
537    fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result<Self, Self::Error> {
538        let metadata =
539            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
540        Ok(SinkMetadata {
541            metadata: Some(Serialized(SerializedMetadata { metadata })),
542        })
543    }
544}
545
546impl DeltaLakeWriteResult {
547    fn try_from(value: &SinkMetadata) -> Result<Self> {
548        if let Some(Serialized(v)) = &value.metadata {
549            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
550                .context("Can't deserialize deltalake sink metadata")?;
551            Ok(DeltaLakeWriteResult { adds })
552        } else {
553            bail!("Can't create deltalake sink write result from empty data!")
554        }
555    }
556}
557
558#[cfg(all(test, not(madsim)))]
559mod test {
560    use deltalake::kernel::DataType as SchemaDataType;
561    use deltalake::operations::create::CreateBuilder;
562    use maplit::btreemap;
563    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
564    use risingwave_common::catalog::{Field, Schema};
565    use risingwave_common::types::DataType;
566
567    use super::{DeltaLakeConfig, DeltaLakeSinkWriter};
568    use crate::sink::SinkCommitCoordinator;
569    use crate::sink::deltalake::DeltaLakeSinkCommitter;
570    use crate::sink::writer::SinkWriter;
571
572    #[tokio::test]
573    async fn test_deltalake() {
574        let dir = tempfile::tempdir().unwrap();
575        let path = dir.path().to_str().unwrap();
576        CreateBuilder::new()
577            .with_location(path)
578            .with_column(
579                "id",
580                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
581                false,
582                Default::default(),
583            )
584            .with_column(
585                "name",
586                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
587                false,
588                Default::default(),
589            )
590            .await
591            .unwrap();
592
593        let properties = btreemap! {
594            "connector".to_owned() => "deltalake".to_owned(),
595            "force_append_only".to_owned() => "true".to_owned(),
596            "type".to_owned() => "append-only".to_owned(),
597            "location".to_owned() => format!("file://{}", path),
598        };
599
600        let schema = Schema::new(vec![
601            Field {
602                data_type: DataType::Int32,
603                name: "id".into(),
604            },
605            Field {
606                data_type: DataType::Varchar,
607                name: "name".into(),
608            },
609        ]);
610
611        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
612        let deltalake_table = deltalake_config
613            .common
614            .create_deltalake_client()
615            .await
616            .unwrap();
617
618        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
619            .await
620            .unwrap();
621        let chunk = StreamChunk::new(
622            vec![Op::Insert, Op::Insert, Op::Insert],
623            vec![
624                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
625                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
626            ],
627        );
628        deltalake_writer.write(chunk).await.unwrap();
629        let mut committer = DeltaLakeSinkCommitter {
630            table: deltalake_table,
631        };
632        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
633        committer.commit(1, vec![metadata]).await.unwrap();
634
635        // The following code is to test reading the deltalake data table written with test data.
636        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
637        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
638        //
639        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
640        // will increase the compile time and output binary size in release build, even though it is a
641        // dev dependency.
642
643        let ctx = deltalake::datafusion::prelude::SessionContext::new();
644        let table = deltalake::open_table(path).await.unwrap();
645        ctx.register_table("demo", std::sync::Arc::new(table))
646            .unwrap();
647
648        let batches = ctx
649            .sql("SELECT * FROM demo")
650            .await
651            .unwrap()
652            .collect()
653            .await
654            .unwrap();
655        assert_eq!(3, batches.get(0).unwrap().column(0).len());
656        assert_eq!(3, batches.get(0).unwrap().column(1).len());
657    }
658}