risingwave_connector/sink/
deltalake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::datafusion::datasource::TableProvider;
27use deltalake::kernel::transaction::CommitBuilder;
28use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
29use deltalake::protocol::{DeltaOperation, SaveMode};
30use deltalake::writer::{DeltaWriter, RecordBatchWriter};
31use url::Url;
32use phf::{Set, phf_set};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::array::arrow::DeltaLakeConvert;
35use risingwave_common::bail;
36use risingwave_common::catalog::{Schema};
37use risingwave_common::types::DataType;
38use risingwave_common::util::iter_util::ZipEqDebug;
39use risingwave_pb::connector_service::SinkMetadata;
40use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
41use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
42use risingwave_pb::stream_plan::PbSinkSchemaChange;
43use serde::{Deserialize, Serialize};
44use serde_with::{DisplayFromStr, serde_as};
45use tokio::sync::mpsc::UnboundedSender;
46use with_options::WithOptions;
47
48use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
49use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
50use crate::sink::coordinate::CoordinatedLogSinker;
51use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
52use crate::sink::writer::SinkWriter;
53use crate::sink::{
54    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55    SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
56    SinkWriterParam,
57};
58
59pub const DEFAULT_REGION: &str = "us-east-1";
60pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
61
62pub const DELTALAKE_SINK: &str = "deltalake";
63
64#[serde_as]
65#[derive(Deserialize, Debug, Clone, WithOptions)]
66pub struct DeltaLakeCommon {
67    #[serde(rename = "location")]
68    pub location: String,
69    #[serde(flatten)]
70    pub aws_auth_props: AwsAuthProps,
71
72    #[serde(rename = "gcs.service.account")]
73    pub gcs_service_account: Option<String>,
74    /// Commit every n(>0) checkpoints, default is 10.
75    #[serde(default = "default_commit_checkpoint_interval")]
76    #[serde_as(as = "DisplayFromStr")]
77    #[with_option(allow_alter_on_fly)]
78    pub commit_checkpoint_interval: u64,
79}
80
81impl EnforceSecret for DeltaLakeCommon {
82    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
83        "gcs.service.account",
84    };
85
86    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
87        AwsAuthProps::enforce_one(prop)?;
88        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
89            return Err(EnforceSecretError {
90                key: prop.to_owned(),
91            }
92            .into());
93        }
94
95        Ok(())
96    }
97}
98
99impl DeltaLakeCommon {
100    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
101        let table = match Self::get_table_url(&self.location)? {
102            DeltaTableUrl::S3(s3_path) => {
103                let storage_options = self.build_delta_lake_config_for_aws().await?;
104                deltalake::aws::register_handlers(None);
105                let url = Url::parse(&s3_path)
106                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
107                deltalake::open_table_with_storage_options(url, storage_options).await?
108            }
109            DeltaTableUrl::Local(local_path) => {
110                let url = Url::parse(&format!("file://{}", local_path))
111                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
112                deltalake::open_table(url).await?
113            }
114            DeltaTableUrl::Gcs(gcs_path) => {
115                let mut storage_options = HashMap::new();
116                storage_options.insert(
117                    GCS_SERVICE_ACCOUNT.to_owned(),
118                    self.gcs_service_account.clone().ok_or_else(|| {
119                        SinkError::Config(anyhow!(
120                            "gcs.service.account is required with Google Cloud Storage (GCS)"
121                        ))
122                    })?,
123                );
124                deltalake::gcp::register_handlers(None);
125                let url = Url::parse(&gcs_path)
126                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
127                deltalake::open_table_with_storage_options(url, storage_options)
128                    .await?
129            }
130        };
131        Ok(table)
132    }
133
134    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
135        if path.starts_with("s3://") || path.starts_with("s3a://") {
136            Ok(DeltaTableUrl::S3(path.to_owned()))
137        } else if path.starts_with("gs://") {
138            Ok(DeltaTableUrl::Gcs(path.to_owned()))
139        } else if let Some(path) = path.strip_prefix("file://") {
140            Ok(DeltaTableUrl::Local(path.to_owned()))
141        } else {
142            Err(SinkError::DeltaLake(anyhow!(
143                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
144            )))
145        }
146    }
147
148    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
149        let mut storage_options = HashMap::new();
150        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
151        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
152        let sdk_config = self.aws_auth_props.build_config().await?;
153        let credentials = sdk_config
154            .credentials_provider()
155            .ok_or_else(|| {
156                SinkError::Config(anyhow!(
157                    "s3.access.key and s3.secret.key is required with aws s3"
158                ))
159            })?
160            .as_ref()
161            .provide_credentials()
162            .await
163            .map_err(|e| SinkError::Config(e.into()))?;
164        let region = sdk_config.region();
165        let endpoint = sdk_config.endpoint_url();
166        storage_options.insert(
167            AWS_ACCESS_KEY_ID.to_owned(),
168            credentials.access_key_id().to_owned(),
169        );
170        storage_options.insert(
171            AWS_SECRET_ACCESS_KEY.to_owned(),
172            credentials.secret_access_key().to_owned(),
173        );
174        if endpoint.is_none() && region.is_none() {
175            return Err(SinkError::Config(anyhow!(
176                "s3.endpoint and s3.region need to be filled with at least one"
177            )));
178        }
179        storage_options.insert(
180            AWS_REGION.to_owned(),
181            region
182                .map(|r| r.as_ref().to_owned())
183                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
184        );
185        if let Some(s3_endpoint) = endpoint {
186            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
187        }
188        Ok(storage_options)
189    }
190}
191
192enum DeltaTableUrl {
193    S3(String),
194    Local(String),
195    Gcs(String),
196}
197
198#[serde_as]
199#[derive(Clone, Debug, Deserialize, WithOptions)]
200pub struct DeltaLakeConfig {
201    #[serde(flatten)]
202    pub common: DeltaLakeCommon,
203
204    pub r#type: String,
205}
206
207impl EnforceSecret for DeltaLakeConfig {
208    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
209        DeltaLakeCommon::enforce_one(prop)
210    }
211}
212
213impl DeltaLakeConfig {
214    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
215        let config = serde_json::from_value::<DeltaLakeConfig>(
216            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
217        )
218        .map_err(|e| SinkError::Config(anyhow!(e)))?;
219        Ok(config)
220    }
221}
222
223#[derive(Debug)]
224pub struct DeltaLakeSink {
225    pub config: DeltaLakeConfig,
226    param: SinkParam,
227}
228
229impl EnforceSecret for DeltaLakeSink {
230    fn enforce_secret<'a>(
231        prop_iter: impl Iterator<Item = &'a str>,
232    ) -> crate::error::ConnectorResult<()> {
233        for prop in prop_iter {
234            DeltaLakeCommon::enforce_one(prop)?;
235        }
236        Ok(())
237    }
238}
239
240impl DeltaLakeSink {
241    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
242        Ok(Self { config, param })
243    }
244}
245
246fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
247    let result = match rw_data_type {
248        DataType::Boolean => {
249            matches!(
250                dl_data_type,
251                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
252            )
253        }
254        DataType::Int16 => {
255            matches!(
256                dl_data_type,
257                DeltaLakeDataType::Primitive(PrimitiveType::Short)
258            )
259        }
260        DataType::Int32 => {
261            matches!(
262                dl_data_type,
263                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
264            )
265        }
266        DataType::Int64 => {
267            matches!(
268                dl_data_type,
269                DeltaLakeDataType::Primitive(PrimitiveType::Long)
270            )
271        }
272        DataType::Float32 => {
273            matches!(
274                dl_data_type,
275                DeltaLakeDataType::Primitive(PrimitiveType::Float)
276            )
277        }
278        DataType::Float64 => {
279            matches!(
280                dl_data_type,
281                DeltaLakeDataType::Primitive(PrimitiveType::Double)
282            )
283        }
284        DataType::Decimal => {
285            matches!(
286                dl_data_type,
287                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
288            )
289        }
290        DataType::Date => {
291            matches!(
292                dl_data_type,
293                DeltaLakeDataType::Primitive(PrimitiveType::Date)
294            )
295        }
296        DataType::Varchar => {
297            matches!(
298                dl_data_type,
299                DeltaLakeDataType::Primitive(PrimitiveType::String)
300            )
301        }
302        DataType::Timestamptz => {
303            matches!(
304                dl_data_type,
305                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
306            )
307        }
308        DataType::Struct(rw_struct) => {
309            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
310                let mut result = true;
311                for ((rw_name, rw_type), dl_field) in
312                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
313                {
314                    result = check_field_type(rw_type, dl_field.data_type())?
315                        && result
316                        && rw_name.eq(dl_field.name());
317                }
318                result
319            } else {
320                false
321            }
322        }
323        DataType::List(rw_list) => {
324            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
325                check_field_type(rw_list.elem(), dl_list.element_type())?
326            } else {
327                false
328            }
329        }
330        _ => {
331            return Err(SinkError::DeltaLake(anyhow!(
332                "Type {:?} is not supported for DeltaLake sink.",
333                rw_data_type.to_owned()
334            )));
335        }
336    };
337    Ok(result)
338}
339
340impl Sink for DeltaLakeSink {
341    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
342
343    const SINK_NAME: &'static str = DELTALAKE_SINK;
344
345    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
346        let inner = DeltaLakeSinkWriter::new(
347            self.config.clone(),
348            self.param.schema().clone(),
349            self.param.downstream_pk_or_empty(),
350        )
351        .await?;
352
353        let commit_checkpoint_interval =
354            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
355                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
356            );
357
358        let writer = CoordinatedLogSinker::new(
359            &writer_param,
360            self.param.clone(),
361            inner,
362            commit_checkpoint_interval,
363        )
364        .await?;
365
366        Ok(writer)
367    }
368
369    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
370        DeltaLakeConfig::from_btreemap(config.clone())?;
371        Ok(())
372    }
373
374    async fn validate(&self) -> Result<()> {
375        if self.config.r#type != SINK_TYPE_APPEND_ONLY
376            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
377        {
378            return Err(SinkError::Config(anyhow!(
379                "only append-only delta lake sink is supported",
380            )));
381        }
382        let table = self.config.common.create_deltalake_client().await?;
383        let snapshot = table.snapshot()?;
384        let delta_schema = snapshot.schema();
385        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
386            .fields()
387            .map(|f| (f.name(), f.data_type()))
388            .collect();
389        if deltalake_fields.len() != self.param.schema().fields().len() {
390            return Err(SinkError::DeltaLake(anyhow!(
391                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
392                self.param.schema().fields().len(),
393                deltalake_fields.len()
394            )));
395        }
396        for field in self.param.schema().fields() {
397            if !deltalake_fields.contains_key(&field.name) {
398                return Err(SinkError::DeltaLake(anyhow!(
399                    "column {} not found in deltalake table",
400                    field.name
401                )));
402            }
403            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
404                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
405            })?;
406            if !check_field_type(&field.data_type, deltalake_field_type)? {
407                return Err(SinkError::DeltaLake(anyhow!(
408                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
409                    field.name,
410                    deltalake_field_type,
411                    field.data_type
412                )));
413            }
414        }
415        if self.config.common.commit_checkpoint_interval == 0 {
416            return Err(SinkError::Config(anyhow!(
417                "`commit_checkpoint_interval` must be greater than 0"
418            )));
419        }
420        Ok(())
421    }
422
423    fn is_coordinated_sink(&self) -> bool {
424        true
425    }
426
427    async fn new_coordinator(
428        &self,
429        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
430    ) -> Result<SinkCommitCoordinator> {
431        let coordinator = DeltaLakeSinkCommitter {
432            table: self.config.common.create_deltalake_client().await?,
433        };
434        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
435    }
436}
437
438impl TryFrom<SinkParam> for DeltaLakeSink {
439    type Error = SinkError;
440
441    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
442        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
443        DeltaLakeSink::new(config, param)
444    }
445}
446
447pub struct DeltaLakeSinkWriter {
448    pub config: DeltaLakeConfig,
449    #[expect(dead_code)]
450    schema: Schema,
451    #[expect(dead_code)]
452    pk_indices: Vec<usize>,
453    writer: RecordBatchWriter,
454    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
455    #[expect(dead_code)]
456    dl_table: DeltaTable,
457}
458
459impl DeltaLakeSinkWriter {
460    pub async fn new(
461        config: DeltaLakeConfig,
462        schema: Schema,
463        pk_indices: Vec<usize>,
464    ) -> Result<Self> {
465        let dl_table = config.common.create_deltalake_client().await?;
466        let writer = RecordBatchWriter::for_table(&dl_table)?;
467        // Use TableProvider::schema() to get the arrow schema directly
468        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> = dl_table.schema();
469
470        Ok(Self {
471            config,
472            schema,
473            pk_indices,
474            writer,
475            dl_schema,
476            dl_table,
477        })
478    }
479
480    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
481        let a = DeltaLakeConvert
482            .to_record_batch(self.dl_schema.clone(), &chunk)
483            .context("convert record batch error")
484            .map_err(SinkError::DeltaLake)?;
485        self.writer.write(a).await?;
486        Ok(())
487    }
488}
489
490
491#[async_trait]
492impl SinkWriter for DeltaLakeSinkWriter {
493    type CommitMetadata = Option<SinkMetadata>;
494
495    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
496        self.write(chunk).await
497    }
498
499    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
500        Ok(())
501    }
502
503    async fn abort(&mut self) -> Result<()> {
504        Ok(())
505    }
506
507    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
508        if !is_checkpoint {
509            return Ok(None);
510        }
511
512        let adds = self.writer.flush().await?;
513        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
514            adds,
515        })?))
516    }
517}
518
519pub struct DeltaLakeSinkCommitter {
520    table: DeltaTable,
521}
522
523#[async_trait::async_trait]
524impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
525    async fn init(&mut self) -> Result<()> {
526        tracing::info!("DeltaLake commit coordinator inited.");
527        Ok(())
528    }
529
530    async fn commit(
531        &mut self,
532        epoch: u64,
533        metadata: Vec<SinkMetadata>,
534        schema_change: Option<PbSinkSchemaChange>,
535    ) -> Result<()> {
536        tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
537        if let Some(schema_change) = schema_change {
538            return Err(anyhow!(
539                "Delta lake sink does not support schema changes, but got: {:?}",
540                schema_change
541            )
542            .into());
543        }
544
545        let deltalake_write_result = metadata
546            .iter()
547            .map(DeltaLakeWriteResult::try_from)
548            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
549        let write_adds: Vec<Action> = deltalake_write_result
550            .into_iter()
551            .flat_map(|v| v.adds.into_iter())
552            .map(Action::Add)
553            .collect();
554
555        if write_adds.is_empty() {
556            return Ok(());
557        }
558        let partition_cols = self.table.snapshot()?.metadata().partition_columns().clone();
559        let partition_by = if !partition_cols.is_empty() {
560            Some(partition_cols)
561        } else {
562            None
563        };
564        let operation = DeltaOperation::Write {
565            mode: SaveMode::Append,
566            partition_by,
567            predicate: None,
568        };
569        let version = CommitBuilder::default()
570            .with_actions(write_adds)
571            .build(
572                Some(self.table.snapshot()?),
573                self.table.log_store().clone(),
574                operation,
575            )
576            .await?
577            .version();
578        self.table.update().await?;
579        tracing::info!(
580            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
581        );
582        Ok(())
583    }
584}
585
586#[derive(Serialize, Deserialize)]
587struct DeltaLakeWriteResult {
588    adds: Vec<Add>,
589}
590
591impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
592    type Error = SinkError;
593
594    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
595        let metadata =
596            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
597        Ok(SinkMetadata {
598            metadata: Some(Serialized(SerializedMetadata { metadata })),
599        })
600    }
601}
602
603impl DeltaLakeWriteResult {
604    fn try_from(value: &SinkMetadata) -> Result<Self> {
605        if let Some(Serialized(v)) = &value.metadata {
606            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
607                .context("Can't deserialize deltalake sink metadata")?;
608            Ok(DeltaLakeWriteResult { adds })
609        } else {
610            bail!("Can't create deltalake sink write result from empty data!")
611        }
612    }
613}
614
615impl From<::deltalake::DeltaTableError> for SinkError {
616    fn from(value: ::deltalake::DeltaTableError) -> Self {
617        SinkError::DeltaLake(anyhow!(value))
618    }
619}
620
621#[cfg(all(test, not(madsim)))]
622mod tests {
623    use deltalake::kernel::DataType as SchemaDataType;
624    use deltalake::operations::create::CreateBuilder;
625    use maplit::btreemap;
626    use url::Url;
627    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
628    use risingwave_common::catalog::{Field, Schema};
629    use risingwave_common::types::DataType;
630
631    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
632    use crate::sink::SinglePhaseCommitCoordinator;
633    use crate::sink::writer::SinkWriter;
634
635    #[tokio::test]
636    async fn test_deltalake() {
637        let dir = tempfile::tempdir().unwrap();
638        let path = dir.path().to_str().unwrap();
639        CreateBuilder::new()
640            .with_location(path)
641            .with_column(
642                "id",
643                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
644                false,
645                Default::default(),
646            )
647            .with_column(
648                "name",
649                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
650                false,
651                Default::default(),
652            )
653            .await
654            .unwrap();
655
656        let properties = btreemap! {
657            "connector".to_owned() => "deltalake".to_owned(),
658            "force_append_only".to_owned() => "true".to_owned(),
659            "type".to_owned() => "append-only".to_owned(),
660            "location".to_owned() => format!("file://{}", path),
661        };
662
663        let schema = Schema::new(vec![
664            Field {
665                data_type: DataType::Int32,
666                name: "id".into(),
667            },
668            Field {
669                data_type: DataType::Varchar,
670                name: "name".into(),
671            },
672        ]);
673
674        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
675        let deltalake_table = deltalake_config
676            .common
677            .create_deltalake_client()
678            .await
679            .unwrap();
680
681        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
682            .await
683            .unwrap();
684        let chunk = StreamChunk::new(
685            vec![Op::Insert, Op::Insert, Op::Insert],
686            vec![
687                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
688                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
689            ],
690        );
691        deltalake_writer.write(chunk).await.unwrap();
692        let mut committer = DeltaLakeSinkCommitter {
693            table: deltalake_table,
694        };
695        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
696        committer.commit(1, vec![metadata], None).await.unwrap();
697
698        // The following code is to test reading the deltalake data table written with test data.
699        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
700        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
701        //
702        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
703        // will increase the compile time and output binary size in release build, even though it is a
704        // dev dependency.
705
706        let ctx = deltalake::datafusion::prelude::SessionContext::new();
707        let table = deltalake::open_table(Url::parse(&format!("file://{}", path)).unwrap()).await.unwrap();
708        ctx.register_table("demo", std::sync::Arc::new(table))
709            .unwrap();
710
711        let batches = ctx
712            .sql("SELECT * FROM demo")
713            .await
714            .unwrap()
715            .collect()
716            .await
717            .unwrap();
718        assert_eq!(3, batches.get(0).unwrap().column(0).len());
719        assert_eq!(3, batches.get(0).unwrap().column(1).len());
720    }
721}