risingwave_connector/sink/
deltalake.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tokio::sync::mpsc::UnboundedSender;
43use url::Url;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
53    SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
54    SinkWriterParam,
55};
56
57pub const DEFAULT_REGION: &str = "us-east-1";
58pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
59
60pub const DELTALAKE_SINK: &str = "deltalake";
61
62#[serde_as]
63#[derive(Deserialize, Debug, Clone, WithOptions)]
64pub struct DeltaLakeCommon {
65    #[serde(rename = "location")]
66    pub location: String,
67    #[serde(flatten)]
68    pub aws_auth_props: AwsAuthProps,
69
70    #[serde(rename = "gcs.service.account")]
71    pub gcs_service_account: Option<String>,
72    /// Commit every n(>0) checkpoints, default is 10.
73    #[serde(default = "default_commit_checkpoint_interval")]
74    #[serde_as(as = "DisplayFromStr")]
75    #[with_option(allow_alter_on_fly)]
76    pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for DeltaLakeCommon {
80    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81        "gcs.service.account",
82    };
83
84    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
85        AwsAuthProps::enforce_one(prop)?;
86        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
87            return Err(EnforceSecretError {
88                key: prop.to_owned(),
89            }
90            .into());
91        }
92
93        Ok(())
94    }
95}
96
97impl DeltaLakeCommon {
98    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
99        let table = match Self::get_table_url(&self.location)? {
100            DeltaTableUrl::S3(s3_path) => {
101                let storage_options = self.build_delta_lake_config_for_aws().await?;
102                deltalake::aws::register_handlers(None);
103                let url = Url::parse(&s3_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
104                deltalake::open_table_with_storage_options(url, storage_options).await?
105            }
106            DeltaTableUrl::Local(local_path) => {
107                let url = Url::parse(&format!("file://{}", local_path))
108                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
109                deltalake::open_table(url).await?
110            }
111            DeltaTableUrl::Gcs(gcs_path) => {
112                let mut storage_options = HashMap::new();
113                storage_options.insert(
114                    GCS_SERVICE_ACCOUNT.to_owned(),
115                    self.gcs_service_account.clone().ok_or_else(|| {
116                        SinkError::Config(anyhow!(
117                            "gcs.service.account is required with Google Cloud Storage (GCS)"
118                        ))
119                    })?,
120                );
121                deltalake::gcp::register_handlers(None);
122                let url = Url::parse(&gcs_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
123                deltalake::open_table_with_storage_options(url, storage_options).await?
124            }
125        };
126        Ok(table)
127    }
128
129    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
130        if path.starts_with("s3://") || path.starts_with("s3a://") {
131            Ok(DeltaTableUrl::S3(path.to_owned()))
132        } else if path.starts_with("gs://") {
133            Ok(DeltaTableUrl::Gcs(path.to_owned()))
134        } else if let Some(path) = path.strip_prefix("file://") {
135            Ok(DeltaTableUrl::Local(path.to_owned()))
136        } else {
137            Err(SinkError::DeltaLake(anyhow!(
138                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
139            )))
140        }
141    }
142
143    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
144        let mut storage_options = HashMap::new();
145        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
146        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
147        let sdk_config = self.aws_auth_props.build_config().await?;
148        let credentials = sdk_config
149            .credentials_provider()
150            .ok_or_else(|| {
151                SinkError::Config(anyhow!(
152                    "s3.access.key and s3.secret.key is required with aws s3"
153                ))
154            })?
155            .as_ref()
156            .provide_credentials()
157            .await
158            .map_err(|e| SinkError::Config(e.into()))?;
159        let region = sdk_config.region();
160        let endpoint = sdk_config.endpoint_url();
161        storage_options.insert(
162            AWS_ACCESS_KEY_ID.to_owned(),
163            credentials.access_key_id().to_owned(),
164        );
165        storage_options.insert(
166            AWS_SECRET_ACCESS_KEY.to_owned(),
167            credentials.secret_access_key().to_owned(),
168        );
169        if endpoint.is_none() && region.is_none() {
170            return Err(SinkError::Config(anyhow!(
171                "s3.endpoint and s3.region need to be filled with at least one"
172            )));
173        }
174        storage_options.insert(
175            AWS_REGION.to_owned(),
176            region
177                .map(|r| r.as_ref().to_owned())
178                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
179        );
180        if let Some(s3_endpoint) = endpoint {
181            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
182        }
183        Ok(storage_options)
184    }
185}
186
187enum DeltaTableUrl {
188    S3(String),
189    Local(String),
190    Gcs(String),
191}
192
193#[serde_as]
194#[derive(Clone, Debug, Deserialize, WithOptions)]
195pub struct DeltaLakeConfig {
196    #[serde(flatten)]
197    pub common: DeltaLakeCommon,
198
199    pub r#type: String,
200}
201
202impl EnforceSecret for DeltaLakeConfig {
203    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
204        DeltaLakeCommon::enforce_one(prop)
205    }
206}
207
208impl DeltaLakeConfig {
209    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
210        let config = serde_json::from_value::<DeltaLakeConfig>(
211            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
212        )
213        .map_err(|e| SinkError::Config(anyhow!(e)))?;
214        Ok(config)
215    }
216}
217
218#[derive(Debug)]
219pub struct DeltaLakeSink {
220    pub config: DeltaLakeConfig,
221    param: SinkParam,
222}
223
224impl EnforceSecret for DeltaLakeSink {
225    fn enforce_secret<'a>(
226        prop_iter: impl Iterator<Item = &'a str>,
227    ) -> crate::error::ConnectorResult<()> {
228        for prop in prop_iter {
229            DeltaLakeCommon::enforce_one(prop)?;
230        }
231        Ok(())
232    }
233}
234
235impl DeltaLakeSink {
236    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
237        Ok(Self { config, param })
238    }
239}
240
241fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
242    let result = match rw_data_type {
243        DataType::Boolean => {
244            matches!(
245                dl_data_type,
246                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
247            )
248        }
249        DataType::Int16 => {
250            matches!(
251                dl_data_type,
252                DeltaLakeDataType::Primitive(PrimitiveType::Short)
253            )
254        }
255        DataType::Int32 => {
256            matches!(
257                dl_data_type,
258                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
259            )
260        }
261        DataType::Int64 => {
262            matches!(
263                dl_data_type,
264                DeltaLakeDataType::Primitive(PrimitiveType::Long)
265            )
266        }
267        DataType::Float32 => {
268            matches!(
269                dl_data_type,
270                DeltaLakeDataType::Primitive(PrimitiveType::Float)
271            )
272        }
273        DataType::Float64 => {
274            matches!(
275                dl_data_type,
276                DeltaLakeDataType::Primitive(PrimitiveType::Double)
277            )
278        }
279        DataType::Decimal => {
280            matches!(
281                dl_data_type,
282                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
283            )
284        }
285        DataType::Date => {
286            matches!(
287                dl_data_type,
288                DeltaLakeDataType::Primitive(PrimitiveType::Date)
289            )
290        }
291        DataType::Varchar => {
292            matches!(
293                dl_data_type,
294                DeltaLakeDataType::Primitive(PrimitiveType::String)
295            )
296        }
297        DataType::Timestamptz => {
298            matches!(
299                dl_data_type,
300                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
301            )
302        }
303        DataType::Struct(rw_struct) => {
304            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
305                let mut result = true;
306                for ((rw_name, rw_type), dl_field) in
307                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
308                {
309                    result = check_field_type(rw_type, dl_field.data_type())?
310                        && result
311                        && rw_name.eq(dl_field.name());
312                }
313                result
314            } else {
315                false
316            }
317        }
318        DataType::List(rw_list) => {
319            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
320                check_field_type(rw_list.elem(), dl_list.element_type())?
321            } else {
322                false
323            }
324        }
325        _ => {
326            return Err(SinkError::DeltaLake(anyhow!(
327                "Type {:?} is not supported for DeltaLake sink.",
328                rw_data_type.to_owned()
329            )));
330        }
331    };
332    Ok(result)
333}
334
335impl Sink for DeltaLakeSink {
336    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
337
338    const SINK_NAME: &'static str = DELTALAKE_SINK;
339
340    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
341        let inner = DeltaLakeSinkWriter::new(
342            self.config.clone(),
343            self.param.schema().clone(),
344            self.param.downstream_pk_or_empty(),
345        )
346        .await?;
347
348        let commit_checkpoint_interval =
349            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
350                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
351            );
352
353        let writer = CoordinatedLogSinker::new(
354            &writer_param,
355            self.param.clone(),
356            inner,
357            commit_checkpoint_interval,
358        )
359        .await?;
360
361        Ok(writer)
362    }
363
364    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
365        DeltaLakeConfig::from_btreemap(config.clone())?;
366        Ok(())
367    }
368
369    async fn validate(&self) -> Result<()> {
370        if self.config.r#type != SINK_TYPE_APPEND_ONLY
371            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
372        {
373            return Err(SinkError::Config(anyhow!(
374                "only append-only delta lake sink is supported",
375            )));
376        }
377        let table = self.config.common.create_deltalake_client().await?;
378        let snapshot = table.snapshot()?;
379        let delta_schema = snapshot.schema();
380        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
381            .fields()
382            .map(|f| (f.name(), f.data_type()))
383            .collect();
384        if deltalake_fields.len() != self.param.schema().fields().len() {
385            return Err(SinkError::DeltaLake(anyhow!(
386                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
387                self.param.schema().fields().len(),
388                deltalake_fields.len()
389            )));
390        }
391        for field in self.param.schema().fields() {
392            if !deltalake_fields.contains_key(&field.name) {
393                return Err(SinkError::DeltaLake(anyhow!(
394                    "column {} not found in deltalake table",
395                    field.name
396                )));
397            }
398            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
399                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
400            })?;
401            if !check_field_type(&field.data_type, deltalake_field_type)? {
402                return Err(SinkError::DeltaLake(anyhow!(
403                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
404                    field.name,
405                    deltalake_field_type,
406                    field.data_type
407                )));
408            }
409        }
410        if self.config.common.commit_checkpoint_interval == 0 {
411            return Err(SinkError::Config(anyhow!(
412                "`commit_checkpoint_interval` must be greater than 0"
413            )));
414        }
415        Ok(())
416    }
417
418    fn is_coordinated_sink(&self) -> bool {
419        true
420    }
421
422    async fn new_coordinator(
423        &self,
424        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
425    ) -> Result<SinkCommitCoordinator> {
426        let coordinator = DeltaLakeSinkCommitter {
427            table: self.config.common.create_deltalake_client().await?,
428        };
429        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
430    }
431}
432
433impl TryFrom<SinkParam> for DeltaLakeSink {
434    type Error = SinkError;
435
436    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
437        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
438        DeltaLakeSink::new(config, param)
439    }
440}
441
442pub struct DeltaLakeSinkWriter {
443    pub config: DeltaLakeConfig,
444    #[expect(dead_code)]
445    schema: Schema,
446    #[expect(dead_code)]
447    pk_indices: Vec<usize>,
448    writer: RecordBatchWriter,
449    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
450    #[expect(dead_code)]
451    dl_table: DeltaTable,
452}
453
454impl DeltaLakeSinkWriter {
455    pub async fn new(
456        config: DeltaLakeConfig,
457        schema: Schema,
458        pk_indices: Vec<usize>,
459    ) -> Result<Self> {
460        let dl_table = config.common.create_deltalake_client().await?;
461        let writer = RecordBatchWriter::for_table(&dl_table)?;
462        let dl_schema = writer.arrow_schema();
463
464        Ok(Self {
465            config,
466            schema,
467            pk_indices,
468            writer,
469            dl_schema,
470            dl_table,
471        })
472    }
473
474    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
475        let a = DeltaLakeConvert
476            .to_record_batch(self.dl_schema.clone(), &chunk)
477            .context("convert record batch error")
478            .map_err(SinkError::DeltaLake)?;
479        self.writer.write(a).await?;
480        Ok(())
481    }
482}
483
484#[async_trait]
485impl SinkWriter for DeltaLakeSinkWriter {
486    type CommitMetadata = Option<SinkMetadata>;
487
488    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
489        self.write(chunk).await
490    }
491
492    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
493        Ok(())
494    }
495
496    async fn abort(&mut self) -> Result<()> {
497        Ok(())
498    }
499
500    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
501        if !is_checkpoint {
502            return Ok(None);
503        }
504
505        let adds = self.writer.flush().await?;
506        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
507            adds,
508        })?))
509    }
510}
511
512pub struct DeltaLakeSinkCommitter {
513    table: DeltaTable,
514}
515
516#[async_trait::async_trait]
517impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
518    async fn init(&mut self) -> Result<()> {
519        tracing::info!("DeltaLake commit coordinator inited.");
520        Ok(())
521    }
522
523    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
524        tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
525
526        let deltalake_write_result = metadata
527            .iter()
528            .map(DeltaLakeWriteResult::try_from)
529            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
530        let write_adds: Vec<Action> = deltalake_write_result
531            .into_iter()
532            .flat_map(|v| v.adds.into_iter())
533            .map(Action::Add)
534            .collect();
535
536        if write_adds.is_empty() {
537            return Ok(());
538        }
539        let partition_cols = self
540            .table
541            .snapshot()?
542            .metadata()
543            .partition_columns()
544            .clone();
545        let partition_by = if !partition_cols.is_empty() {
546            Some(partition_cols)
547        } else {
548            None
549        };
550        let operation = DeltaOperation::Write {
551            mode: SaveMode::Append,
552            partition_by,
553            predicate: None,
554        };
555        let version = CommitBuilder::default()
556            .with_actions(write_adds)
557            .build(
558                Some(self.table.snapshot()?),
559                self.table.log_store().clone(),
560                operation,
561            )
562            .await?
563            .version();
564        self.table.update_state().await?;
565        tracing::debug!(
566            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
567        );
568        Ok(())
569    }
570}
571
572#[derive(Serialize, Deserialize)]
573struct DeltaLakeWriteResult {
574    adds: Vec<Add>,
575}
576
577impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
578    type Error = SinkError;
579
580    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
581        let metadata =
582            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
583        Ok(SinkMetadata {
584            metadata: Some(Serialized(SerializedMetadata { metadata })),
585        })
586    }
587}
588
589impl DeltaLakeWriteResult {
590    fn try_from(value: &SinkMetadata) -> Result<Self> {
591        if let Some(Serialized(v)) = &value.metadata {
592            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
593                .context("Can't deserialize deltalake sink metadata")?;
594            Ok(DeltaLakeWriteResult { adds })
595        } else {
596            bail!("Can't create deltalake sink write result from empty data!")
597        }
598    }
599}
600
601impl From<::deltalake::DeltaTableError> for SinkError {
602    fn from(value: ::deltalake::DeltaTableError) -> Self {
603        SinkError::DeltaLake(anyhow!(value))
604    }
605}
606
607#[cfg(all(test, not(madsim)))]
608mod tests {
609    use deltalake::kernel::DataType as SchemaDataType;
610    use deltalake::operations::create::CreateBuilder;
611    use maplit::btreemap;
612    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
613    use risingwave_common::catalog::{Field, Schema};
614    use risingwave_common::types::DataType;
615
616    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
617    use crate::sink::SinglePhaseCommitCoordinator;
618    use crate::sink::writer::SinkWriter;
619
620    #[tokio::test]
621    async fn test_deltalake() {
622        let dir = tempfile::tempdir().unwrap();
623        let path = dir.path().to_str().unwrap();
624        CreateBuilder::new()
625            .with_location(path)
626            .with_column(
627                "id",
628                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
629                false,
630                Default::default(),
631            )
632            .with_column(
633                "name",
634                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
635                false,
636                Default::default(),
637            )
638            .await
639            .unwrap();
640
641        let properties = btreemap! {
642            "connector".to_owned() => "deltalake".to_owned(),
643            "force_append_only".to_owned() => "true".to_owned(),
644            "type".to_owned() => "append-only".to_owned(),
645            "location".to_owned() => format!("file://{}", path),
646        };
647
648        let schema = Schema::new(vec![
649            Field {
650                data_type: DataType::Int32,
651                name: "id".into(),
652            },
653            Field {
654                data_type: DataType::Varchar,
655                name: "name".into(),
656            },
657        ]);
658
659        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
660        let deltalake_table = deltalake_config
661            .common
662            .create_deltalake_client()
663            .await
664            .unwrap();
665
666        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
667            .await
668            .unwrap();
669        let chunk = StreamChunk::new(
670            vec![Op::Insert, Op::Insert, Op::Insert],
671            vec![
672                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
673                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
674            ],
675        );
676        deltalake_writer.write(chunk).await.unwrap();
677        let mut committer = DeltaLakeSinkCommitter {
678            table: deltalake_table,
679        };
680        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
681        committer.commit_data(1, vec![metadata]).await.unwrap();
682        let snapshot = committer.table.snapshot().unwrap();
683        assert_eq!(1, snapshot.log_data().num_files());
684    }
685}