risingwave_connector/sink/deltalake/
imp.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde_derive::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use tokio::sync::mpsc::UnboundedSender;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
53    SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
54};
55
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59#[serde_as]
60#[derive(Deserialize, Debug, Clone, WithOptions)]
61pub struct DeltaLakeCommon {
62    #[serde(rename = "location")]
63    pub location: String,
64    #[serde(flatten)]
65    pub aws_auth_props: AwsAuthProps,
66
67    #[serde(rename = "gcs.service.account")]
68    pub gcs_service_account: Option<String>,
69    /// Commit every n(>0) checkpoints, default is 10.
70    #[serde(default = "default_commit_checkpoint_interval")]
71    #[serde_as(as = "DisplayFromStr")]
72    pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for DeltaLakeCommon {
76    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77        "gcs.service.account",
78    };
79
80    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
81        AwsAuthProps::enforce_one(prop)?;
82        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
83            return Err(EnforceSecretError {
84                key: prop.to_owned(),
85            }
86            .into());
87        }
88
89        Ok(())
90    }
91}
92
93impl DeltaLakeCommon {
94    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
95        let table = match Self::get_table_url(&self.location)? {
96            DeltaTableUrl::S3(s3_path) => {
97                let storage_options = self.build_delta_lake_config_for_aws().await?;
98                deltalake::aws::register_handlers(None);
99                deltalake::open_table_with_storage_options(&s3_path, storage_options).await?
100            }
101            DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
102            DeltaTableUrl::Gcs(gcs_path) => {
103                let mut storage_options = HashMap::new();
104                storage_options.insert(
105                    GCS_SERVICE_ACCOUNT.to_owned(),
106                    self.gcs_service_account.clone().ok_or_else(|| {
107                        SinkError::Config(anyhow!(
108                            "gcs.service.account is required with Google Cloud Storage (GCS)"
109                        ))
110                    })?,
111                );
112                deltalake::gcp::register_handlers(None);
113                deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
114                    .await?
115            }
116        };
117        Ok(table)
118    }
119
120    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
121        if path.starts_with("s3://") || path.starts_with("s3a://") {
122            Ok(DeltaTableUrl::S3(path.to_owned()))
123        } else if path.starts_with("gs://") {
124            Ok(DeltaTableUrl::Gcs(path.to_owned()))
125        } else if let Some(path) = path.strip_prefix("file://") {
126            Ok(DeltaTableUrl::Local(path.to_owned()))
127        } else {
128            Err(SinkError::DeltaLake(anyhow!(
129                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
130            )))
131        }
132    }
133
134    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
135        let mut storage_options = HashMap::new();
136        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
137        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
138        let sdk_config = self.aws_auth_props.build_config().await?;
139        let credentials = sdk_config
140            .credentials_provider()
141            .ok_or_else(|| {
142                SinkError::Config(anyhow!(
143                    "s3.access.key and s3.secret.key is required with aws s3"
144                ))
145            })?
146            .as_ref()
147            .provide_credentials()
148            .await
149            .map_err(|e| SinkError::Config(e.into()))?;
150        let region = sdk_config.region();
151        let endpoint = sdk_config.endpoint_url();
152        storage_options.insert(
153            AWS_ACCESS_KEY_ID.to_owned(),
154            credentials.access_key_id().to_owned(),
155        );
156        storage_options.insert(
157            AWS_SECRET_ACCESS_KEY.to_owned(),
158            credentials.secret_access_key().to_owned(),
159        );
160        if endpoint.is_none() && region.is_none() {
161            return Err(SinkError::Config(anyhow!(
162                "s3.endpoint and s3.region need to be filled with at least one"
163            )));
164        }
165        storage_options.insert(
166            AWS_REGION.to_owned(),
167            region
168                .map(|r| r.as_ref().to_owned())
169                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
170        );
171        if let Some(s3_endpoint) = endpoint {
172            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
173        }
174        Ok(storage_options)
175    }
176}
177
178enum DeltaTableUrl {
179    S3(String),
180    Local(String),
181    Gcs(String),
182}
183
184#[serde_as]
185#[derive(Clone, Debug, Deserialize, WithOptions)]
186pub struct DeltaLakeConfig {
187    #[serde(flatten)]
188    pub common: DeltaLakeCommon,
189
190    pub r#type: String,
191}
192
193impl EnforceSecret for DeltaLakeConfig {
194    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
195        DeltaLakeCommon::enforce_one(prop)
196    }
197}
198
199impl DeltaLakeConfig {
200    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
201        let config = serde_json::from_value::<DeltaLakeConfig>(
202            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
203        )
204        .map_err(|e| SinkError::Config(anyhow!(e)))?;
205        Ok(config)
206    }
207}
208
209#[derive(Debug)]
210pub struct DeltaLakeSink {
211    pub config: DeltaLakeConfig,
212    param: SinkParam,
213}
214
215impl EnforceSecret for DeltaLakeSink {
216    fn enforce_secret<'a>(
217        prop_iter: impl Iterator<Item = &'a str>,
218    ) -> crate::error::ConnectorResult<()> {
219        for prop in prop_iter {
220            DeltaLakeCommon::enforce_one(prop)?;
221        }
222        Ok(())
223    }
224}
225
226impl DeltaLakeSink {
227    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
228        Ok(Self { config, param })
229    }
230}
231
232fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
233    let result = match rw_data_type {
234        DataType::Boolean => {
235            matches!(
236                dl_data_type,
237                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
238            )
239        }
240        DataType::Int16 => {
241            matches!(
242                dl_data_type,
243                DeltaLakeDataType::Primitive(PrimitiveType::Short)
244            )
245        }
246        DataType::Int32 => {
247            matches!(
248                dl_data_type,
249                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
250            )
251        }
252        DataType::Int64 => {
253            matches!(
254                dl_data_type,
255                DeltaLakeDataType::Primitive(PrimitiveType::Long)
256            )
257        }
258        DataType::Float32 => {
259            matches!(
260                dl_data_type,
261                DeltaLakeDataType::Primitive(PrimitiveType::Float)
262            )
263        }
264        DataType::Float64 => {
265            matches!(
266                dl_data_type,
267                DeltaLakeDataType::Primitive(PrimitiveType::Double)
268            )
269        }
270        DataType::Decimal => {
271            matches!(
272                dl_data_type,
273                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
274            )
275        }
276        DataType::Date => {
277            matches!(
278                dl_data_type,
279                DeltaLakeDataType::Primitive(PrimitiveType::Date)
280            )
281        }
282        DataType::Varchar => {
283            matches!(
284                dl_data_type,
285                DeltaLakeDataType::Primitive(PrimitiveType::String)
286            )
287        }
288        DataType::Timestamptz => {
289            matches!(
290                dl_data_type,
291                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
292            )
293        }
294        DataType::Struct(rw_struct) => {
295            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
296                let mut result = true;
297                for ((rw_name, rw_type), dl_field) in
298                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
299                {
300                    result = check_field_type(rw_type, dl_field.data_type())?
301                        && result
302                        && rw_name.eq(dl_field.name());
303                }
304                result
305            } else {
306                false
307            }
308        }
309        DataType::List(rw_list) => {
310            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
311                check_field_type(rw_list, dl_list.element_type())?
312            } else {
313                false
314            }
315        }
316        _ => {
317            return Err(SinkError::DeltaLake(anyhow!(
318                "deltalake cannot support type {:?}",
319                rw_data_type.to_owned()
320            )));
321        }
322    };
323    Ok(result)
324}
325
326impl Sink for DeltaLakeSink {
327    type Coordinator = DeltaLakeSinkCommitter;
328    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
329
330    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
331    const SINK_NAME: &'static str = super::DELTALAKE_SINK;
332
333    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
334        let inner = DeltaLakeSinkWriter::new(
335            self.config.clone(),
336            self.param.schema().clone(),
337            self.param.downstream_pk.clone(),
338        )
339        .await?;
340
341        let commit_checkpoint_interval =
342            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
343                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
344            );
345
346        let writer = CoordinatedLogSinker::new(
347            &writer_param,
348            self.param.clone(),
349            inner,
350            commit_checkpoint_interval,
351        )
352        .await?;
353
354        Ok(writer)
355    }
356
357    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
358        DeltaLakeConfig::from_btreemap(config.clone())?;
359        Ok(())
360    }
361
362    async fn validate(&self) -> Result<()> {
363        if self.config.r#type != SINK_TYPE_APPEND_ONLY
364            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
365        {
366            return Err(SinkError::Config(anyhow!(
367                "only append-only delta lake sink is supported",
368            )));
369        }
370        let table = self.config.common.create_deltalake_client().await?;
371        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
372            .get_schema()?
373            .fields()
374            .map(|f| (f.name(), f.data_type()))
375            .collect();
376        if deltalake_fields.len() != self.param.schema().fields().len() {
377            return Err(SinkError::DeltaLake(anyhow!(
378                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
379                self.param.schema().fields().len(),
380                deltalake_fields.len()
381            )));
382        }
383        for field in self.param.schema().fields() {
384            if !deltalake_fields.contains_key(&field.name) {
385                return Err(SinkError::DeltaLake(anyhow!(
386                    "column {} not found in deltalake table",
387                    field.name
388                )));
389            }
390            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
391                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
392            })?;
393            if !check_field_type(&field.data_type, deltalake_field_type)? {
394                return Err(SinkError::DeltaLake(anyhow!(
395                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
396                    field.name,
397                    deltalake_field_type,
398                    field.data_type
399                )));
400            }
401        }
402        if self.config.common.commit_checkpoint_interval == 0 {
403            return Err(SinkError::Config(anyhow!(
404                "`commit_checkpoint_interval` must be greater than 0"
405            )));
406        }
407        Ok(())
408    }
409
410    fn is_coordinated_sink(&self) -> bool {
411        true
412    }
413
414    async fn new_coordinator(
415        &self,
416        _db: DatabaseConnection,
417        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
418    ) -> Result<Self::Coordinator> {
419        Ok(DeltaLakeSinkCommitter {
420            table: self.config.common.create_deltalake_client().await?,
421        })
422    }
423}
424
425impl TryFrom<SinkParam> for DeltaLakeSink {
426    type Error = SinkError;
427
428    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
429        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
430        DeltaLakeSink::new(config, param)
431    }
432}
433
434pub struct DeltaLakeSinkWriter {
435    pub config: DeltaLakeConfig,
436    #[expect(dead_code)]
437    schema: Schema,
438    #[expect(dead_code)]
439    pk_indices: Vec<usize>,
440    writer: RecordBatchWriter,
441    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
442    #[expect(dead_code)]
443    dl_table: DeltaTable,
444}
445
446impl DeltaLakeSinkWriter {
447    pub async fn new(
448        config: DeltaLakeConfig,
449        schema: Schema,
450        pk_indices: Vec<usize>,
451    ) -> Result<Self> {
452        let dl_table = config.common.create_deltalake_client().await?;
453        let writer = RecordBatchWriter::for_table(&dl_table)?;
454        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
455            Arc::new(convert_schema(dl_table.get_schema()?)?);
456
457        Ok(Self {
458            config,
459            schema,
460            pk_indices,
461            writer,
462            dl_schema,
463            dl_table,
464        })
465    }
466
467    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
468        let a = DeltaLakeConvert
469            .to_record_batch(self.dl_schema.clone(), &chunk)
470            .context("convert record batch error")
471            .map_err(SinkError::DeltaLake)?;
472        self.writer.write(a).await?;
473        Ok(())
474    }
475}
476
477fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
478    let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
479    for field in schema.fields() {
480        let arrow_field_type = deltalake::arrow::datatypes::DataType::try_from(field.data_type())
481            .with_context(|| {
482                format!(
483                    "Failed to convert DeltaLake data type {:?} to Arrow data type for field '{}'",
484                    field.data_type(),
485                    field.name()
486                )
487            })
488            .map_err(SinkError::DeltaLake)?;
489        let dl_field = deltalake::arrow::datatypes::Field::new(
490            field.name(),
491            arrow_field_type,
492            field.is_nullable(),
493        );
494        builder.push(dl_field);
495    }
496    Ok(builder.finish())
497}
498
499#[async_trait]
500impl SinkWriter for DeltaLakeSinkWriter {
501    type CommitMetadata = Option<SinkMetadata>;
502
503    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
504        self.write(chunk).await
505    }
506
507    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
508        Ok(())
509    }
510
511    async fn abort(&mut self) -> Result<()> {
512        Ok(())
513    }
514
515    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
516        if !is_checkpoint {
517            return Ok(None);
518        }
519
520        let adds = self.writer.flush().await?;
521        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
522            adds,
523        })?))
524    }
525}
526
527pub struct DeltaLakeSinkCommitter {
528    table: DeltaTable,
529}
530
531#[async_trait::async_trait]
532impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
533    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
534        tracing::info!("DeltaLake commit coordinator inited.");
535        Ok(None)
536    }
537
538    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
539        tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
540
541        let deltalake_write_result = metadata
542            .iter()
543            .map(DeltaLakeWriteResult::try_from)
544            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
545        let write_adds: Vec<Action> = deltalake_write_result
546            .into_iter()
547            .flat_map(|v| v.adds.into_iter())
548            .map(Action::Add)
549            .collect();
550
551        if write_adds.is_empty() {
552            return Ok(());
553        }
554        let partition_cols = self.table.metadata()?.partition_columns.clone();
555        let partition_by = if !partition_cols.is_empty() {
556            Some(partition_cols)
557        } else {
558            None
559        };
560        let operation = DeltaOperation::Write {
561            mode: SaveMode::Append,
562            partition_by,
563            predicate: None,
564        };
565        let version = CommitBuilder::default()
566            .with_actions(write_adds)
567            .build(
568                Some(self.table.snapshot()?),
569                self.table.log_store().clone(),
570                operation,
571            )
572            .await?
573            .version();
574        self.table.update().await?;
575        tracing::info!(
576            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
577        );
578        Ok(())
579    }
580}
581
582#[derive(Serialize, Deserialize)]
583struct DeltaLakeWriteResult {
584    adds: Vec<Add>,
585}
586
587impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
588    type Error = SinkError;
589
590    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
591        let metadata =
592            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
593        Ok(SinkMetadata {
594            metadata: Some(Serialized(SerializedMetadata { metadata })),
595        })
596    }
597}
598
599impl DeltaLakeWriteResult {
600    fn try_from(value: &SinkMetadata) -> Result<Self> {
601        if let Some(Serialized(v)) = &value.metadata {
602            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
603                .context("Can't deserialize deltalake sink metadata")?;
604            Ok(DeltaLakeWriteResult { adds })
605        } else {
606            bail!("Can't create deltalake sink write result from empty data!")
607        }
608    }
609}
610
611#[cfg(all(test, not(madsim)))]
612mod test {
613    use deltalake::kernel::DataType as SchemaDataType;
614    use deltalake::operations::create::CreateBuilder;
615    use maplit::btreemap;
616    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
617    use risingwave_common::catalog::{Field, Schema};
618    use risingwave_common::types::DataType;
619
620    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
621    use crate::sink::SinkCommitCoordinator;
622    use crate::sink::writer::SinkWriter;
623
624    #[tokio::test]
625    async fn test_deltalake() {
626        let dir = tempfile::tempdir().unwrap();
627        let path = dir.path().to_str().unwrap();
628        CreateBuilder::new()
629            .with_location(path)
630            .with_column(
631                "id",
632                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
633                false,
634                Default::default(),
635            )
636            .with_column(
637                "name",
638                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
639                false,
640                Default::default(),
641            )
642            .await
643            .unwrap();
644
645        let properties = btreemap! {
646            "connector".to_owned() => "deltalake".to_owned(),
647            "force_append_only".to_owned() => "true".to_owned(),
648            "type".to_owned() => "append-only".to_owned(),
649            "location".to_owned() => format!("file://{}", path),
650        };
651
652        let schema = Schema::new(vec![
653            Field {
654                data_type: DataType::Int32,
655                name: "id".into(),
656            },
657            Field {
658                data_type: DataType::Varchar,
659                name: "name".into(),
660            },
661        ]);
662
663        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
664        let deltalake_table = deltalake_config
665            .common
666            .create_deltalake_client()
667            .await
668            .unwrap();
669
670        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
671            .await
672            .unwrap();
673        let chunk = StreamChunk::new(
674            vec![Op::Insert, Op::Insert, Op::Insert],
675            vec![
676                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
677                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
678            ],
679        );
680        deltalake_writer.write(chunk).await.unwrap();
681        let mut committer = DeltaLakeSinkCommitter {
682            table: deltalake_table,
683        };
684        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
685        committer.commit(1, vec![metadata]).await.unwrap();
686
687        // The following code is to test reading the deltalake data table written with test data.
688        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
689        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
690        //
691        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
692        // will increase the compile time and output binary size in release build, even though it is a
693        // dev dependency.
694
695        let ctx = deltalake::datafusion::prelude::SessionContext::new();
696        let table = deltalake::open_table(path).await.unwrap();
697        ctx.register_table("demo", std::sync::Arc::new(table))
698            .unwrap();
699
700        let batches = ctx
701            .sql("SELECT * FROM demo")
702            .await
703            .unwrap()
704            .collect()
705            .await
706            .unwrap();
707        assert_eq!(3, batches.get(0).unwrap().column(0).len());
708        assert_eq!(3, batches.get(0).unwrap().column(1).len());
709    }
710}