risingwave_connector/sink/
deltalake.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::datafusion::datasource::TableProvider;
27use deltalake::kernel::transaction::CommitBuilder;
28use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
29use deltalake::protocol::{DeltaOperation, SaveMode};
30use deltalake::writer::{DeltaWriter, RecordBatchWriter};
31use url::Url;
32use phf::{Set, phf_set};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::array::arrow::DeltaLakeConvert;
35use risingwave_common::bail;
36use risingwave_common::catalog::{Schema};
37use risingwave_common::types::DataType;
38use risingwave_common::util::iter_util::ZipEqDebug;
39use risingwave_pb::connector_service::SinkMetadata;
40use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
41use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
42use serde::{Deserialize, Serialize};
43use serde_with::{DisplayFromStr, serde_as};
44use tokio::sync::mpsc::UnboundedSender;
45use with_options::WithOptions;
46
47use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::sink::coordinate::CoordinatedLogSinker;
50use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
51use crate::sink::writer::SinkWriter;
52use crate::sink::{
53    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
54    SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
55    SinkWriterParam,
56};
57
58pub const DEFAULT_REGION: &str = "us-east-1";
59pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
60
61pub const DELTALAKE_SINK: &str = "deltalake";
62
63#[serde_as]
64#[derive(Deserialize, Debug, Clone, WithOptions)]
65pub struct DeltaLakeCommon {
66    #[serde(rename = "location")]
67    pub location: String,
68    #[serde(flatten)]
69    pub aws_auth_props: AwsAuthProps,
70
71    #[serde(rename = "gcs.service.account")]
72    pub gcs_service_account: Option<String>,
73    /// Commit every n(>0) checkpoints, default is 10.
74    #[serde(default = "default_commit_checkpoint_interval")]
75    #[serde_as(as = "DisplayFromStr")]
76    #[with_option(allow_alter_on_fly)]
77    pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for DeltaLakeCommon {
81    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82        "gcs.service.account",
83    };
84
85    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
86        AwsAuthProps::enforce_one(prop)?;
87        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
88            return Err(EnforceSecretError {
89                key: prop.to_owned(),
90            }
91            .into());
92        }
93
94        Ok(())
95    }
96}
97
98impl DeltaLakeCommon {
99    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
100        let table = match Self::get_table_url(&self.location)? {
101            DeltaTableUrl::S3(s3_path) => {
102                let storage_options = self.build_delta_lake_config_for_aws().await?;
103                deltalake::aws::register_handlers(None);
104                let url = Url::parse(&s3_path)
105                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
106                deltalake::open_table_with_storage_options(url, storage_options).await?
107            }
108            DeltaTableUrl::Local(local_path) => {
109                let url = Url::parse(&format!("file://{}", local_path))
110                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
111                deltalake::open_table(url).await?
112            }
113            DeltaTableUrl::Gcs(gcs_path) => {
114                let mut storage_options = HashMap::new();
115                storage_options.insert(
116                    GCS_SERVICE_ACCOUNT.to_owned(),
117                    self.gcs_service_account.clone().ok_or_else(|| {
118                        SinkError::Config(anyhow!(
119                            "gcs.service.account is required with Google Cloud Storage (GCS)"
120                        ))
121                    })?,
122                );
123                deltalake::gcp::register_handlers(None);
124                let url = Url::parse(&gcs_path)
125                    .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
126                deltalake::open_table_with_storage_options(url, storage_options)
127                    .await?
128            }
129        };
130        Ok(table)
131    }
132
133    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
134        if path.starts_with("s3://") || path.starts_with("s3a://") {
135            Ok(DeltaTableUrl::S3(path.to_owned()))
136        } else if path.starts_with("gs://") {
137            Ok(DeltaTableUrl::Gcs(path.to_owned()))
138        } else if let Some(path) = path.strip_prefix("file://") {
139            Ok(DeltaTableUrl::Local(path.to_owned()))
140        } else {
141            Err(SinkError::DeltaLake(anyhow!(
142                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
143            )))
144        }
145    }
146
147    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
148        let mut storage_options = HashMap::new();
149        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
150        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
151        let sdk_config = self.aws_auth_props.build_config().await?;
152        let credentials = sdk_config
153            .credentials_provider()
154            .ok_or_else(|| {
155                SinkError::Config(anyhow!(
156                    "s3.access.key and s3.secret.key is required with aws s3"
157                ))
158            })?
159            .as_ref()
160            .provide_credentials()
161            .await
162            .map_err(|e| SinkError::Config(e.into()))?;
163        let region = sdk_config.region();
164        let endpoint = sdk_config.endpoint_url();
165        storage_options.insert(
166            AWS_ACCESS_KEY_ID.to_owned(),
167            credentials.access_key_id().to_owned(),
168        );
169        storage_options.insert(
170            AWS_SECRET_ACCESS_KEY.to_owned(),
171            credentials.secret_access_key().to_owned(),
172        );
173        if endpoint.is_none() && region.is_none() {
174            return Err(SinkError::Config(anyhow!(
175                "s3.endpoint and s3.region need to be filled with at least one"
176            )));
177        }
178        storage_options.insert(
179            AWS_REGION.to_owned(),
180            region
181                .map(|r| r.as_ref().to_owned())
182                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
183        );
184        if let Some(s3_endpoint) = endpoint {
185            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
186        }
187        Ok(storage_options)
188    }
189}
190
191enum DeltaTableUrl {
192    S3(String),
193    Local(String),
194    Gcs(String),
195}
196
197#[serde_as]
198#[derive(Clone, Debug, Deserialize, WithOptions)]
199pub struct DeltaLakeConfig {
200    #[serde(flatten)]
201    pub common: DeltaLakeCommon,
202
203    pub r#type: String,
204}
205
206impl EnforceSecret for DeltaLakeConfig {
207    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
208        DeltaLakeCommon::enforce_one(prop)
209    }
210}
211
212impl DeltaLakeConfig {
213    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
214        let config = serde_json::from_value::<DeltaLakeConfig>(
215            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
216        )
217        .map_err(|e| SinkError::Config(anyhow!(e)))?;
218        Ok(config)
219    }
220}
221
222#[derive(Debug)]
223pub struct DeltaLakeSink {
224    pub config: DeltaLakeConfig,
225    param: SinkParam,
226}
227
228impl EnforceSecret for DeltaLakeSink {
229    fn enforce_secret<'a>(
230        prop_iter: impl Iterator<Item = &'a str>,
231    ) -> crate::error::ConnectorResult<()> {
232        for prop in prop_iter {
233            DeltaLakeCommon::enforce_one(prop)?;
234        }
235        Ok(())
236    }
237}
238
239impl DeltaLakeSink {
240    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
241        Ok(Self { config, param })
242    }
243}
244
245fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
246    let result = match rw_data_type {
247        DataType::Boolean => {
248            matches!(
249                dl_data_type,
250                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
251            )
252        }
253        DataType::Int16 => {
254            matches!(
255                dl_data_type,
256                DeltaLakeDataType::Primitive(PrimitiveType::Short)
257            )
258        }
259        DataType::Int32 => {
260            matches!(
261                dl_data_type,
262                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
263            )
264        }
265        DataType::Int64 => {
266            matches!(
267                dl_data_type,
268                DeltaLakeDataType::Primitive(PrimitiveType::Long)
269            )
270        }
271        DataType::Float32 => {
272            matches!(
273                dl_data_type,
274                DeltaLakeDataType::Primitive(PrimitiveType::Float)
275            )
276        }
277        DataType::Float64 => {
278            matches!(
279                dl_data_type,
280                DeltaLakeDataType::Primitive(PrimitiveType::Double)
281            )
282        }
283        DataType::Decimal => {
284            matches!(
285                dl_data_type,
286                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
287            )
288        }
289        DataType::Date => {
290            matches!(
291                dl_data_type,
292                DeltaLakeDataType::Primitive(PrimitiveType::Date)
293            )
294        }
295        DataType::Varchar => {
296            matches!(
297                dl_data_type,
298                DeltaLakeDataType::Primitive(PrimitiveType::String)
299            )
300        }
301        DataType::Timestamptz => {
302            matches!(
303                dl_data_type,
304                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
305            )
306        }
307        DataType::Struct(rw_struct) => {
308            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
309                let mut result = true;
310                for ((rw_name, rw_type), dl_field) in
311                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
312                {
313                    result = check_field_type(rw_type, dl_field.data_type())?
314                        && result
315                        && rw_name.eq(dl_field.name());
316                }
317                result
318            } else {
319                false
320            }
321        }
322        DataType::List(rw_list) => {
323            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
324                check_field_type(rw_list.elem(), dl_list.element_type())?
325            } else {
326                false
327            }
328        }
329        _ => {
330            return Err(SinkError::DeltaLake(anyhow!(
331                "Type {:?} is not supported for DeltaLake sink.",
332                rw_data_type.to_owned()
333            )));
334        }
335    };
336    Ok(result)
337}
338
339impl Sink for DeltaLakeSink {
340    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
341
342    const SINK_NAME: &'static str = DELTALAKE_SINK;
343
344    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
345        let inner = DeltaLakeSinkWriter::new(
346            self.config.clone(),
347            self.param.schema().clone(),
348            self.param.downstream_pk_or_empty(),
349        )
350        .await?;
351
352        let commit_checkpoint_interval =
353            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
354                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
355            );
356
357        let writer = CoordinatedLogSinker::new(
358            &writer_param,
359            self.param.clone(),
360            inner,
361            commit_checkpoint_interval,
362        )
363        .await?;
364
365        Ok(writer)
366    }
367
368    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
369        DeltaLakeConfig::from_btreemap(config.clone())?;
370        Ok(())
371    }
372
373    async fn validate(&self) -> Result<()> {
374        if self.config.r#type != SINK_TYPE_APPEND_ONLY
375            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
376        {
377            return Err(SinkError::Config(anyhow!(
378                "only append-only delta lake sink is supported",
379            )));
380        }
381        let table = self.config.common.create_deltalake_client().await?;
382        let snapshot = table.snapshot()?;
383        let delta_schema = snapshot.schema();
384        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
385            .fields()
386            .map(|f| (f.name(), f.data_type()))
387            .collect();
388        if deltalake_fields.len() != self.param.schema().fields().len() {
389            return Err(SinkError::DeltaLake(anyhow!(
390                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
391                self.param.schema().fields().len(),
392                deltalake_fields.len()
393            )));
394        }
395        for field in self.param.schema().fields() {
396            if !deltalake_fields.contains_key(&field.name) {
397                return Err(SinkError::DeltaLake(anyhow!(
398                    "column {} not found in deltalake table",
399                    field.name
400                )));
401            }
402            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
403                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
404            })?;
405            if !check_field_type(&field.data_type, deltalake_field_type)? {
406                return Err(SinkError::DeltaLake(anyhow!(
407                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
408                    field.name,
409                    deltalake_field_type,
410                    field.data_type
411                )));
412            }
413        }
414        if self.config.common.commit_checkpoint_interval == 0 {
415            return Err(SinkError::Config(anyhow!(
416                "`commit_checkpoint_interval` must be greater than 0"
417            )));
418        }
419        Ok(())
420    }
421
422    fn is_coordinated_sink(&self) -> bool {
423        true
424    }
425
426    async fn new_coordinator(
427        &self,
428        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
429    ) -> Result<SinkCommitCoordinator> {
430        let coordinator = DeltaLakeSinkCommitter {
431            table: self.config.common.create_deltalake_client().await?,
432        };
433        Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
434    }
435}
436
437impl TryFrom<SinkParam> for DeltaLakeSink {
438    type Error = SinkError;
439
440    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
441        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
442        DeltaLakeSink::new(config, param)
443    }
444}
445
446pub struct DeltaLakeSinkWriter {
447    pub config: DeltaLakeConfig,
448    #[expect(dead_code)]
449    schema: Schema,
450    #[expect(dead_code)]
451    pk_indices: Vec<usize>,
452    writer: RecordBatchWriter,
453    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
454    #[expect(dead_code)]
455    dl_table: DeltaTable,
456}
457
458impl DeltaLakeSinkWriter {
459    pub async fn new(
460        config: DeltaLakeConfig,
461        schema: Schema,
462        pk_indices: Vec<usize>,
463    ) -> Result<Self> {
464        let dl_table = config.common.create_deltalake_client().await?;
465        let writer = RecordBatchWriter::for_table(&dl_table)?;
466        // Use TableProvider::schema() to get the arrow schema directly
467        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> = dl_table.schema();
468
469        Ok(Self {
470            config,
471            schema,
472            pk_indices,
473            writer,
474            dl_schema,
475            dl_table,
476        })
477    }
478
479    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
480        let a = DeltaLakeConvert
481            .to_record_batch(self.dl_schema.clone(), &chunk)
482            .context("convert record batch error")
483            .map_err(SinkError::DeltaLake)?;
484        self.writer.write(a).await?;
485        Ok(())
486    }
487}
488
489
490#[async_trait]
491impl SinkWriter for DeltaLakeSinkWriter {
492    type CommitMetadata = Option<SinkMetadata>;
493
494    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
495        self.write(chunk).await
496    }
497
498    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
499        Ok(())
500    }
501
502    async fn abort(&mut self) -> Result<()> {
503        Ok(())
504    }
505
506    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
507        if !is_checkpoint {
508            return Ok(None);
509        }
510
511        let adds = self.writer.flush().await?;
512        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
513            adds,
514        })?))
515    }
516}
517
518pub struct DeltaLakeSinkCommitter {
519    table: DeltaTable,
520}
521
522#[async_trait::async_trait]
523impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
524    async fn init(&mut self) -> Result<()> {
525        tracing::info!("DeltaLake commit coordinator inited.");
526        Ok(())
527    }
528
529    async fn commit_data(
530        &mut self,
531        epoch: u64,
532        metadata: Vec<SinkMetadata>,
533    ) -> Result<()> {
534        tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
535
536        let deltalake_write_result = metadata
537            .iter()
538            .map(DeltaLakeWriteResult::try_from)
539            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
540        let write_adds: Vec<Action> = deltalake_write_result
541            .into_iter()
542            .flat_map(|v| v.adds.into_iter())
543            .map(Action::Add)
544            .collect();
545
546        if write_adds.is_empty() {
547            return Ok(());
548        }
549        let partition_cols = self.table.snapshot()?.metadata().partition_columns().clone();
550        let partition_by = if !partition_cols.is_empty() {
551            Some(partition_cols)
552        } else {
553            None
554        };
555        let operation = DeltaOperation::Write {
556            mode: SaveMode::Append,
557            partition_by,
558            predicate: None,
559        };
560        let version = CommitBuilder::default()
561            .with_actions(write_adds)
562            .build(
563                Some(self.table.snapshot()?),
564                self.table.log_store().clone(),
565                operation,
566            )
567            .await?
568            .version();
569        self.table.update().await?;
570        tracing::debug!(
571            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
572        );
573        Ok(())
574    }
575}
576
577#[derive(Serialize, Deserialize)]
578struct DeltaLakeWriteResult {
579    adds: Vec<Add>,
580}
581
582impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
583    type Error = SinkError;
584
585    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
586        let metadata =
587            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
588        Ok(SinkMetadata {
589            metadata: Some(Serialized(SerializedMetadata { metadata })),
590        })
591    }
592}
593
594impl DeltaLakeWriteResult {
595    fn try_from(value: &SinkMetadata) -> Result<Self> {
596        if let Some(Serialized(v)) = &value.metadata {
597            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
598                .context("Can't deserialize deltalake sink metadata")?;
599            Ok(DeltaLakeWriteResult { adds })
600        } else {
601            bail!("Can't create deltalake sink write result from empty data!")
602        }
603    }
604}
605
606impl From<::deltalake::DeltaTableError> for SinkError {
607    fn from(value: ::deltalake::DeltaTableError) -> Self {
608        SinkError::DeltaLake(anyhow!(value))
609    }
610}
611
612#[cfg(all(test, not(madsim)))]
613mod tests {
614    use deltalake::kernel::DataType as SchemaDataType;
615    use deltalake::operations::create::CreateBuilder;
616    use maplit::btreemap;
617    use url::Url;
618    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
619    use risingwave_common::catalog::{Field, Schema};
620    use risingwave_common::types::DataType;
621
622    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
623    use crate::sink::SinglePhaseCommitCoordinator;
624    use crate::sink::writer::SinkWriter;
625
626    #[tokio::test]
627    async fn test_deltalake() {
628        let dir = tempfile::tempdir().unwrap();
629        let path = dir.path().to_str().unwrap();
630        CreateBuilder::new()
631            .with_location(path)
632            .with_column(
633                "id",
634                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
635                false,
636                Default::default(),
637            )
638            .with_column(
639                "name",
640                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
641                false,
642                Default::default(),
643            )
644            .await
645            .unwrap();
646
647        let properties = btreemap! {
648            "connector".to_owned() => "deltalake".to_owned(),
649            "force_append_only".to_owned() => "true".to_owned(),
650            "type".to_owned() => "append-only".to_owned(),
651            "location".to_owned() => format!("file://{}", path),
652        };
653
654        let schema = Schema::new(vec![
655            Field {
656                data_type: DataType::Int32,
657                name: "id".into(),
658            },
659            Field {
660                data_type: DataType::Varchar,
661                name: "name".into(),
662            },
663        ]);
664
665        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
666        let deltalake_table = deltalake_config
667            .common
668            .create_deltalake_client()
669            .await
670            .unwrap();
671
672        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
673            .await
674            .unwrap();
675        let chunk = StreamChunk::new(
676            vec![Op::Insert, Op::Insert, Op::Insert],
677            vec![
678                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
679                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
680            ],
681        );
682        deltalake_writer.write(chunk).await.unwrap();
683        let mut committer = DeltaLakeSinkCommitter {
684            table: deltalake_table,
685        };
686        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
687        committer.commit_data(1, vec![metadata]).await.unwrap();
688
689        // The following code is to test reading the deltalake data table written with test data.
690        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
691        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
692        //
693        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
694        // will increase the compile time and output binary size in release build, even though it is a
695        // dev dependency.
696
697        let ctx = deltalake::datafusion::prelude::SessionContext::new();
698        let table = deltalake::open_table(Url::parse(&format!("file://{}", path)).unwrap()).await.unwrap();
699        ctx.register_table("demo", std::sync::Arc::new(table))
700            .unwrap();
701
702        let batches = ctx
703            .sql("SELECT * FROM demo")
704            .await
705            .unwrap()
706            .collect()
707            .await
708            .unwrap();
709        assert_eq!(3, batches.get(0).unwrap().column(0).len());
710        assert_eq!(3, batches.get(0).unwrap().column(1).len());
711    }
712}