risingwave_connector/sink/
deltalake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
27use deltalake::operations::transaction::CommitBuilder;
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde_derive::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use with_options::WithOptions;
44
45use super::coordinate::CoordinatedLogSinker;
46use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
47use super::writer::SinkWriter;
48use super::{
49    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
50    SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
51};
52use crate::connector_common::AwsAuthProps;
53use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
54
55pub const DELTALAKE_SINK: &str = "deltalake";
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59#[serde_as]
60#[derive(Deserialize, Debug, Clone, WithOptions)]
61pub struct DeltaLakeCommon {
62    #[serde(rename = "location")]
63    pub location: String,
64    #[serde(flatten)]
65    pub aws_auth_props: AwsAuthProps,
66
67    #[serde(rename = "gcs.service.account")]
68    pub gcs_service_account: Option<String>,
69    /// Commit every n(>0) checkpoints, default is 10.
70    #[serde(default = "default_commit_checkpoint_interval")]
71    #[serde_as(as = "DisplayFromStr")]
72    pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for DeltaLakeCommon {
76    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77        "gcs.service.account",
78    };
79
80    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
81        AwsAuthProps::enforce_one(prop)?;
82        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
83            return Err(EnforceSecretError {
84                key: prop.to_owned(),
85            }
86            .into());
87        }
88
89        Ok(())
90    }
91}
92
93impl DeltaLakeCommon {
94    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
95        let table = match Self::get_table_url(&self.location)? {
96            DeltaTableUrl::S3(s3_path) => {
97                let storage_options = self.build_delta_lake_config_for_aws().await?;
98                deltalake::aws::register_handlers(None);
99                deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
100            }
101            DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
102            DeltaTableUrl::Gcs(gcs_path) => {
103                let mut storage_options = HashMap::new();
104                storage_options.insert(
105                    GCS_SERVICE_ACCOUNT.to_owned(),
106                    self.gcs_service_account.clone().ok_or_else(|| {
107                        SinkError::Config(anyhow!(
108                            "gcs.service.account is required with Google Cloud Storage (GCS)"
109                        ))
110                    })?,
111                );
112                deltalake::gcp::register_handlers(None);
113                deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
114                    .await?
115            }
116        };
117        Ok(table)
118    }
119
120    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
121        if path.starts_with("s3://") || path.starts_with("s3a://") {
122            Ok(DeltaTableUrl::S3(path.to_owned()))
123        } else if path.starts_with("gs://") {
124            Ok(DeltaTableUrl::Gcs(path.to_owned()))
125        } else if let Some(path) = path.strip_prefix("file://") {
126            Ok(DeltaTableUrl::Local(path.to_owned()))
127        } else {
128            Err(SinkError::DeltaLake(anyhow!(
129                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
130            )))
131        }
132    }
133
134    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
135        let mut storage_options = HashMap::new();
136        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
137        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
138        let sdk_config = self.aws_auth_props.build_config().await?;
139        let credentials = sdk_config
140            .credentials_provider()
141            .ok_or_else(|| {
142                SinkError::Config(anyhow!(
143                    "s3.access.key and s3.secret.key is required with aws s3"
144                ))
145            })?
146            .as_ref()
147            .provide_credentials()
148            .await
149            .map_err(|e| SinkError::Config(e.into()))?;
150        let region = sdk_config.region();
151        let endpoint = sdk_config.endpoint_url();
152        storage_options.insert(
153            AWS_ACCESS_KEY_ID.to_owned(),
154            credentials.access_key_id().to_owned(),
155        );
156        storage_options.insert(
157            AWS_SECRET_ACCESS_KEY.to_owned(),
158            credentials.secret_access_key().to_owned(),
159        );
160        if endpoint.is_none() && region.is_none() {
161            return Err(SinkError::Config(anyhow!(
162                "s3.endpoint and s3.region need to be filled with at least one"
163            )));
164        }
165        storage_options.insert(
166            AWS_REGION.to_owned(),
167            region
168                .map(|r| r.as_ref().to_owned())
169                .clone()
170                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
171        );
172        if let Some(s3_endpoint) = endpoint {
173            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
174        }
175        Ok(storage_options)
176    }
177}
178
179enum DeltaTableUrl {
180    S3(String),
181    Local(String),
182    Gcs(String),
183}
184
185#[serde_as]
186#[derive(Clone, Debug, Deserialize, WithOptions)]
187pub struct DeltaLakeConfig {
188    #[serde(flatten)]
189    pub common: DeltaLakeCommon,
190
191    pub r#type: String,
192}
193
194impl EnforceSecret for DeltaLakeConfig {
195    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
196        DeltaLakeCommon::enforce_one(prop)
197    }
198}
199
200impl DeltaLakeConfig {
201    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
202        let config = serde_json::from_value::<DeltaLakeConfig>(
203            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
204        )
205        .map_err(|e| SinkError::Config(anyhow!(e)))?;
206        Ok(config)
207    }
208}
209
210#[derive(Debug)]
211pub struct DeltaLakeSink {
212    pub config: DeltaLakeConfig,
213    param: SinkParam,
214}
215
216impl EnforceSecret for DeltaLakeSink {
217    fn enforce_secret<'a>(
218        prop_iter: impl Iterator<Item = &'a str>,
219    ) -> crate::error::ConnectorResult<()> {
220        for prop in prop_iter {
221            DeltaLakeCommon::enforce_one(prop)?;
222        }
223        Ok(())
224    }
225}
226
227impl DeltaLakeSink {
228    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
229        Ok(Self { config, param })
230    }
231}
232
233fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
234    let result = match rw_data_type {
235        DataType::Boolean => {
236            matches!(
237                dl_data_type,
238                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
239            )
240        }
241        DataType::Int16 => {
242            matches!(
243                dl_data_type,
244                DeltaLakeDataType::Primitive(PrimitiveType::Short)
245            )
246        }
247        DataType::Int32 => {
248            matches!(
249                dl_data_type,
250                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
251            )
252        }
253        DataType::Int64 => {
254            matches!(
255                dl_data_type,
256                DeltaLakeDataType::Primitive(PrimitiveType::Long)
257            )
258        }
259        DataType::Float32 => {
260            matches!(
261                dl_data_type,
262                DeltaLakeDataType::Primitive(PrimitiveType::Float)
263            )
264        }
265        DataType::Float64 => {
266            matches!(
267                dl_data_type,
268                DeltaLakeDataType::Primitive(PrimitiveType::Double)
269            )
270        }
271        DataType::Decimal => {
272            matches!(
273                dl_data_type,
274                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_, _))
275            )
276        }
277        DataType::Date => {
278            matches!(
279                dl_data_type,
280                DeltaLakeDataType::Primitive(PrimitiveType::Date)
281            )
282        }
283        DataType::Varchar => {
284            matches!(
285                dl_data_type,
286                DeltaLakeDataType::Primitive(PrimitiveType::String)
287            )
288        }
289        DataType::Timestamptz => {
290            matches!(
291                dl_data_type,
292                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
293            )
294        }
295        DataType::Struct(rw_struct) => {
296            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
297                let mut result = true;
298                for ((rw_name, rw_type), dl_field) in
299                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
300                {
301                    result = check_field_type(rw_type, dl_field.data_type())?
302                        && result
303                        && rw_name.eq(dl_field.name());
304                }
305                result
306            } else {
307                false
308            }
309        }
310        DataType::List(rw_list) => {
311            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
312                check_field_type(rw_list, dl_list.element_type())?
313            } else {
314                false
315            }
316        }
317        _ => {
318            return Err(SinkError::DeltaLake(anyhow!(
319                "deltalake cannot support type {:?}",
320                rw_data_type.to_owned()
321            )));
322        }
323    };
324    Ok(result)
325}
326
327impl Sink for DeltaLakeSink {
328    type Coordinator = DeltaLakeSinkCommitter;
329    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
330
331    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
332    const SINK_NAME: &'static str = DELTALAKE_SINK;
333
334    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
335        let inner = DeltaLakeSinkWriter::new(
336            self.config.clone(),
337            self.param.schema().clone(),
338            self.param.downstream_pk.clone(),
339        )
340        .await?;
341
342        let commit_checkpoint_interval =
343            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
344                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
345            );
346
347        let writer = CoordinatedLogSinker::new(
348            &writer_param,
349            self.param.clone(),
350            inner,
351            commit_checkpoint_interval,
352        )
353        .await?;
354
355        Ok(writer)
356    }
357
358    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
359        DeltaLakeConfig::from_btreemap(config.clone())?;
360        Ok(())
361    }
362
363    async fn validate(&self) -> Result<()> {
364        if self.config.r#type != SINK_TYPE_APPEND_ONLY
365            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
366        {
367            return Err(SinkError::Config(anyhow!(
368                "only append-only delta lake sink is supported",
369            )));
370        }
371        let table = self.config.common.create_deltalake_client().await?;
372        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
373            .get_schema()?
374            .fields()
375            .map(|f| (f.name(), f.data_type()))
376            .collect();
377        if deltalake_fields.len() != self.param.schema().fields().len() {
378            return Err(SinkError::DeltaLake(anyhow!(
379                "Columns mismatch. RisingWave is {}, DeltaLake is {}",
380                self.param.schema().fields().len(),
381                deltalake_fields.len()
382            )));
383        }
384        for field in self.param.schema().fields() {
385            if !deltalake_fields.contains_key(&field.name) {
386                return Err(SinkError::DeltaLake(anyhow!(
387                    "column {} not found in deltalake table",
388                    field.name
389                )));
390            }
391            let deltalake_field_type: &&DeltaLakeDataType = deltalake_fields
392                .get(&field.name)
393                .ok_or_else(|| SinkError::DeltaLake(anyhow!("cannot find field type")))?;
394            if !check_field_type(&field.data_type, deltalake_field_type)? {
395                return Err(SinkError::DeltaLake(anyhow!(
396                    "column {} type is not match, deltalake is {:?}, rw is{:?}",
397                    field.name,
398                    deltalake_field_type,
399                    field.data_type
400                )));
401            }
402        }
403        if self.config.common.commit_checkpoint_interval == 0 {
404            return Err(SinkError::Config(anyhow!(
405                "`commit_checkpoint_interval` must be greater than 0"
406            )));
407        }
408        Ok(())
409    }
410
411    fn is_coordinated_sink(&self) -> bool {
412        true
413    }
414
415    async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
416        Ok(DeltaLakeSinkCommitter {
417            table: self.config.common.create_deltalake_client().await?,
418        })
419    }
420}
421
422impl TryFrom<SinkParam> for DeltaLakeSink {
423    type Error = SinkError;
424
425    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
426        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
427        DeltaLakeSink::new(config, param)
428    }
429}
430
431pub struct DeltaLakeSinkWriter {
432    pub config: DeltaLakeConfig,
433    #[expect(dead_code)]
434    schema: Schema,
435    #[expect(dead_code)]
436    pk_indices: Vec<usize>,
437    writer: RecordBatchWriter,
438    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
439    #[expect(dead_code)]
440    dl_table: DeltaTable,
441}
442
443impl DeltaLakeSinkWriter {
444    pub async fn new(
445        config: DeltaLakeConfig,
446        schema: Schema,
447        pk_indices: Vec<usize>,
448    ) -> Result<Self> {
449        let dl_table = config.common.create_deltalake_client().await?;
450        let writer = RecordBatchWriter::for_table(&dl_table)?;
451        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
452            Arc::new(convert_schema(dl_table.get_schema()?)?);
453
454        Ok(Self {
455            config,
456            schema,
457            pk_indices,
458            writer,
459            dl_schema,
460            dl_table,
461        })
462    }
463
464    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
465        let a = DeltaLakeConvert
466            .to_record_batch(self.dl_schema.clone(), &chunk)
467            .context("convert record batch error")
468            .map_err(SinkError::DeltaLake)?;
469        self.writer.write(a).await?;
470        Ok(())
471    }
472}
473
474fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
475    let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
476    for field in schema.fields() {
477        let dl_field = deltalake::arrow::datatypes::Field::new(
478            field.name(),
479            deltalake::arrow::datatypes::DataType::try_from(field.data_type())
480                .context("convert schema error")
481                .map_err(SinkError::DeltaLake)?,
482            field.is_nullable(),
483        );
484        builder.push(dl_field);
485    }
486    Ok(builder.finish())
487}
488
489#[async_trait]
490impl SinkWriter for DeltaLakeSinkWriter {
491    type CommitMetadata = Option<SinkMetadata>;
492
493    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
494        self.write(chunk).await
495    }
496
497    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
498        Ok(())
499    }
500
501    async fn abort(&mut self) -> Result<()> {
502        Ok(())
503    }
504
505    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
506        if !is_checkpoint {
507            return Ok(None);
508        }
509
510        let adds = self.writer.flush().await?;
511        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
512            adds,
513        })?))
514    }
515}
516
517pub struct DeltaLakeSinkCommitter {
518    table: DeltaTable,
519}
520
521#[async_trait::async_trait]
522impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
523    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
524        tracing::info!("DeltaLake commit coordinator inited.");
525        Ok(None)
526    }
527
528    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
529        tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
530
531        let deltalake_write_result = metadata
532            .iter()
533            .map(DeltaLakeWriteResult::try_from)
534            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
535        let write_adds: Vec<Action> = deltalake_write_result
536            .into_iter()
537            .flat_map(|v| v.adds.into_iter())
538            .map(Action::Add)
539            .collect();
540
541        if write_adds.is_empty() {
542            return Ok(());
543        }
544        let partition_cols = self.table.metadata()?.partition_columns.clone();
545        let partition_by = if !partition_cols.is_empty() {
546            Some(partition_cols)
547        } else {
548            None
549        };
550        let operation = DeltaOperation::Write {
551            mode: SaveMode::Append,
552            partition_by,
553            predicate: None,
554        };
555        let version = CommitBuilder::default()
556            .with_actions(write_adds)
557            .build(
558                Some(self.table.snapshot()?),
559                self.table.log_store().clone(),
560                operation,
561            )
562            .await?
563            .version();
564        self.table.update().await?;
565        tracing::info!(
566            "Succeeded to commit ti DeltaLake table in epoch {epoch} version {version}."
567        );
568        Ok(())
569    }
570}
571
572#[derive(Serialize, Deserialize)]
573struct DeltaLakeWriteResult {
574    adds: Vec<Add>,
575}
576
577impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
578    type Error = SinkError;
579
580    fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result<Self, Self::Error> {
581        let metadata =
582            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
583        Ok(SinkMetadata {
584            metadata: Some(Serialized(SerializedMetadata { metadata })),
585        })
586    }
587}
588
589impl DeltaLakeWriteResult {
590    fn try_from(value: &SinkMetadata) -> Result<Self> {
591        if let Some(Serialized(v)) = &value.metadata {
592            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
593                .context("Can't deserialize deltalake sink metadata")?;
594            Ok(DeltaLakeWriteResult { adds })
595        } else {
596            bail!("Can't create deltalake sink write result from empty data!")
597        }
598    }
599}
600
601#[cfg(all(test, not(madsim)))]
602mod test {
603    use deltalake::kernel::DataType as SchemaDataType;
604    use deltalake::operations::create::CreateBuilder;
605    use maplit::btreemap;
606    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
607    use risingwave_common::catalog::{Field, Schema};
608    use risingwave_common::types::DataType;
609
610    use super::{DeltaLakeConfig, DeltaLakeSinkWriter};
611    use crate::sink::SinkCommitCoordinator;
612    use crate::sink::deltalake::DeltaLakeSinkCommitter;
613    use crate::sink::writer::SinkWriter;
614
615    #[tokio::test]
616    async fn test_deltalake() {
617        let dir = tempfile::tempdir().unwrap();
618        let path = dir.path().to_str().unwrap();
619        CreateBuilder::new()
620            .with_location(path)
621            .with_column(
622                "id",
623                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
624                false,
625                Default::default(),
626            )
627            .with_column(
628                "name",
629                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
630                false,
631                Default::default(),
632            )
633            .await
634            .unwrap();
635
636        let properties = btreemap! {
637            "connector".to_owned() => "deltalake".to_owned(),
638            "force_append_only".to_owned() => "true".to_owned(),
639            "type".to_owned() => "append-only".to_owned(),
640            "location".to_owned() => format!("file://{}", path),
641        };
642
643        let schema = Schema::new(vec![
644            Field {
645                data_type: DataType::Int32,
646                name: "id".into(),
647            },
648            Field {
649                data_type: DataType::Varchar,
650                name: "name".into(),
651            },
652        ]);
653
654        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
655        let deltalake_table = deltalake_config
656            .common
657            .create_deltalake_client()
658            .await
659            .unwrap();
660
661        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
662            .await
663            .unwrap();
664        let chunk = StreamChunk::new(
665            vec![Op::Insert, Op::Insert, Op::Insert],
666            vec![
667                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
668                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
669            ],
670        );
671        deltalake_writer.write(chunk).await.unwrap();
672        let mut committer = DeltaLakeSinkCommitter {
673            table: deltalake_table,
674        };
675        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
676        committer.commit(1, vec![metadata]).await.unwrap();
677
678        // The following code is to test reading the deltalake data table written with test data.
679        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
680        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
681        //
682        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
683        // will increase the compile time and output binary size in release build, even though it is a
684        // dev dependency.
685
686        let ctx = deltalake::datafusion::prelude::SessionContext::new();
687        let table = deltalake::open_table(path).await.unwrap();
688        ctx.register_table("demo", std::sync::Arc::new(table))
689            .unwrap();
690
691        let batches = ctx
692            .sql("SELECT * FROM demo")
693            .await
694            .unwrap()
695            .collect()
696            .await
697            .unwrap();
698        assert_eq!(3, batches.get(0).unwrap().column(0).len());
699        assert_eq!(3, batches.get(0).unwrap().column(1).len());
700    }
701}