risingwave_connector/sink/
deltalake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23    AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24    AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::{Field, Schema};
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use tokio::sync::mpsc::UnboundedSender;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52    Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
53    SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
54};
55
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59pub const DELTALAKE_SINK: &str = "deltalake";
60
61#[serde_as]
62#[derive(Deserialize, Debug, Clone, WithOptions)]
63pub struct DeltaLakeCommon {
64    #[serde(rename = "location")]
65    pub location: String,
66    #[serde(flatten)]
67    pub aws_auth_props: AwsAuthProps,
68
69    #[serde(rename = "gcs.service.account")]
70    pub gcs_service_account: Option<String>,
71    /// Commit every n(>0) checkpoints, default is 10.
72    #[serde(default = "default_commit_checkpoint_interval")]
73    #[serde_as(as = "DisplayFromStr")]
74    #[with_option(allow_alter_on_fly)]
75    pub commit_checkpoint_interval: u64,
76}
77
78impl EnforceSecret for DeltaLakeCommon {
79    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
80        "gcs.service.account",
81    };
82
83    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
84        AwsAuthProps::enforce_one(prop)?;
85        if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86            return Err(EnforceSecretError {
87                key: prop.to_owned(),
88            }
89            .into());
90        }
91
92        Ok(())
93    }
94}
95
96impl DeltaLakeCommon {
97    pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
98        let table = match Self::get_table_url(&self.location)? {
99            DeltaTableUrl::S3(s3_path) => {
100                let storage_options = self.build_delta_lake_config_for_aws().await?;
101                deltalake::aws::register_handlers(None);
102                deltalake::open_table_with_storage_options(&s3_path, storage_options).await?
103            }
104            DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
105            DeltaTableUrl::Gcs(gcs_path) => {
106                let mut storage_options = HashMap::new();
107                storage_options.insert(
108                    GCS_SERVICE_ACCOUNT.to_owned(),
109                    self.gcs_service_account.clone().ok_or_else(|| {
110                        SinkError::Config(anyhow!(
111                            "gcs.service.account is required with Google Cloud Storage (GCS)"
112                        ))
113                    })?,
114                );
115                deltalake::gcp::register_handlers(None);
116                deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
117                    .await?
118            }
119        };
120        Ok(table)
121    }
122
123    fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
124        if path.starts_with("s3://") || path.starts_with("s3a://") {
125            Ok(DeltaTableUrl::S3(path.to_owned()))
126        } else if path.starts_with("gs://") {
127            Ok(DeltaTableUrl::Gcs(path.to_owned()))
128        } else if let Some(path) = path.strip_prefix("file://") {
129            Ok(DeltaTableUrl::Local(path.to_owned()))
130        } else {
131            Err(SinkError::DeltaLake(anyhow!(
132                "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
133            )))
134        }
135    }
136
137    async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
138        let mut storage_options = HashMap::new();
139        storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
140        storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
141        let sdk_config = self.aws_auth_props.build_config().await?;
142        let credentials = sdk_config
143            .credentials_provider()
144            .ok_or_else(|| {
145                SinkError::Config(anyhow!(
146                    "s3.access.key and s3.secret.key is required with aws s3"
147                ))
148            })?
149            .as_ref()
150            .provide_credentials()
151            .await
152            .map_err(|e| SinkError::Config(e.into()))?;
153        let region = sdk_config.region();
154        let endpoint = sdk_config.endpoint_url();
155        storage_options.insert(
156            AWS_ACCESS_KEY_ID.to_owned(),
157            credentials.access_key_id().to_owned(),
158        );
159        storage_options.insert(
160            AWS_SECRET_ACCESS_KEY.to_owned(),
161            credentials.secret_access_key().to_owned(),
162        );
163        if endpoint.is_none() && region.is_none() {
164            return Err(SinkError::Config(anyhow!(
165                "s3.endpoint and s3.region need to be filled with at least one"
166            )));
167        }
168        storage_options.insert(
169            AWS_REGION.to_owned(),
170            region
171                .map(|r| r.as_ref().to_owned())
172                .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
173        );
174        if let Some(s3_endpoint) = endpoint {
175            storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
176        }
177        Ok(storage_options)
178    }
179}
180
181enum DeltaTableUrl {
182    S3(String),
183    Local(String),
184    Gcs(String),
185}
186
187#[serde_as]
188#[derive(Clone, Debug, Deserialize, WithOptions)]
189pub struct DeltaLakeConfig {
190    #[serde(flatten)]
191    pub common: DeltaLakeCommon,
192
193    pub r#type: String,
194}
195
196impl EnforceSecret for DeltaLakeConfig {
197    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
198        DeltaLakeCommon::enforce_one(prop)
199    }
200}
201
202impl DeltaLakeConfig {
203    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
204        let config = serde_json::from_value::<DeltaLakeConfig>(
205            serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
206        )
207        .map_err(|e| SinkError::Config(anyhow!(e)))?;
208        Ok(config)
209    }
210}
211
212#[derive(Debug)]
213pub struct DeltaLakeSink {
214    pub config: DeltaLakeConfig,
215    param: SinkParam,
216}
217
218impl EnforceSecret for DeltaLakeSink {
219    fn enforce_secret<'a>(
220        prop_iter: impl Iterator<Item = &'a str>,
221    ) -> crate::error::ConnectorResult<()> {
222        for prop in prop_iter {
223            DeltaLakeCommon::enforce_one(prop)?;
224        }
225        Ok(())
226    }
227}
228
229impl DeltaLakeSink {
230    pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
231        Ok(Self { config, param })
232    }
233}
234
235fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
236    let result = match rw_data_type {
237        DataType::Boolean => {
238            matches!(
239                dl_data_type,
240                DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
241            )
242        }
243        DataType::Int16 => {
244            matches!(
245                dl_data_type,
246                DeltaLakeDataType::Primitive(PrimitiveType::Short)
247            )
248        }
249        DataType::Int32 => {
250            matches!(
251                dl_data_type,
252                DeltaLakeDataType::Primitive(PrimitiveType::Integer)
253            )
254        }
255        DataType::Int64 => {
256            matches!(
257                dl_data_type,
258                DeltaLakeDataType::Primitive(PrimitiveType::Long)
259            )
260        }
261        DataType::Float32 => {
262            matches!(
263                dl_data_type,
264                DeltaLakeDataType::Primitive(PrimitiveType::Float)
265            )
266        }
267        DataType::Float64 => {
268            matches!(
269                dl_data_type,
270                DeltaLakeDataType::Primitive(PrimitiveType::Double)
271            )
272        }
273        DataType::Decimal => {
274            matches!(
275                dl_data_type,
276                DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
277            )
278        }
279        DataType::Date => {
280            matches!(
281                dl_data_type,
282                DeltaLakeDataType::Primitive(PrimitiveType::Date)
283            )
284        }
285        DataType::Varchar => {
286            matches!(
287                dl_data_type,
288                DeltaLakeDataType::Primitive(PrimitiveType::String)
289            )
290        }
291        DataType::Timestamptz => {
292            matches!(
293                dl_data_type,
294                DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
295            )
296        }
297        DataType::Struct(rw_struct) => {
298            if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
299                let mut result = true;
300                for ((rw_name, rw_type), dl_field) in
301                    rw_struct.iter().zip_eq_debug(dl_struct.fields())
302                {
303                    result = check_field_type(rw_type, dl_field.data_type())?
304                        && result
305                        && rw_name.eq(dl_field.name());
306                }
307                result
308            } else {
309                false
310            }
311        }
312        DataType::List(rw_list) => {
313            if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
314                check_field_type(rw_list.elem(), dl_list.element_type())?
315            } else {
316                false
317            }
318        }
319        _ => {
320            return Err(SinkError::DeltaLake(anyhow!(
321                "Type {:?} is not supported for DeltaLake sink.",
322                rw_data_type.to_owned()
323            )));
324        }
325    };
326    Ok(result)
327}
328
329impl Sink for DeltaLakeSink {
330    type Coordinator = DeltaLakeSinkCommitter;
331    type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
332
333    const SINK_NAME: &'static str = DELTALAKE_SINK;
334
335    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
336        let inner = DeltaLakeSinkWriter::new(
337            self.config.clone(),
338            self.param.schema().clone(),
339            self.param.downstream_pk_or_empty(),
340        )
341        .await?;
342
343        let commit_checkpoint_interval =
344            NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
345                "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
346            );
347
348        let writer = CoordinatedLogSinker::new(
349            &writer_param,
350            self.param.clone(),
351            inner,
352            commit_checkpoint_interval,
353        )
354        .await?;
355
356        Ok(writer)
357    }
358
359    fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
360        DeltaLakeConfig::from_btreemap(config.clone())?;
361        Ok(())
362    }
363
364    async fn validate(&self) -> Result<()> {
365        if self.config.r#type != SINK_TYPE_APPEND_ONLY
366            && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
367        {
368            return Err(SinkError::Config(anyhow!(
369                "only append-only delta lake sink is supported",
370            )));
371        }
372        let table = self.config.common.create_deltalake_client().await?;
373        let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
374            .get_schema()?
375            .fields()
376            .map(|f| (f.name(), f.data_type()))
377            .collect();
378        if deltalake_fields.len() != self.param.schema().fields().len() {
379            return Err(SinkError::DeltaLake(anyhow!(
380                "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
381                self.param.schema().fields().len(),
382                deltalake_fields.len()
383            )));
384        }
385        for field in self.param.schema().fields() {
386            if !deltalake_fields.contains_key(&field.name) {
387                return Err(SinkError::DeltaLake(anyhow!(
388                    "column {} not found in deltalake table",
389                    field.name
390                )));
391            }
392            let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
393                SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
394            })?;
395            if !check_field_type(&field.data_type, deltalake_field_type)? {
396                return Err(SinkError::DeltaLake(anyhow!(
397                    "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
398                    field.name,
399                    deltalake_field_type,
400                    field.data_type
401                )));
402            }
403        }
404        if self.config.common.commit_checkpoint_interval == 0 {
405            return Err(SinkError::Config(anyhow!(
406                "`commit_checkpoint_interval` must be greater than 0"
407            )));
408        }
409        Ok(())
410    }
411
412    fn is_coordinated_sink(&self) -> bool {
413        true
414    }
415
416    async fn new_coordinator(
417        &self,
418        _db: DatabaseConnection,
419        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
420    ) -> Result<Self::Coordinator> {
421        Ok(DeltaLakeSinkCommitter {
422            table: self.config.common.create_deltalake_client().await?,
423        })
424    }
425}
426
427impl TryFrom<SinkParam> for DeltaLakeSink {
428    type Error = SinkError;
429
430    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
431        let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
432        DeltaLakeSink::new(config, param)
433    }
434}
435
436pub struct DeltaLakeSinkWriter {
437    pub config: DeltaLakeConfig,
438    #[expect(dead_code)]
439    schema: Schema,
440    #[expect(dead_code)]
441    pk_indices: Vec<usize>,
442    writer: RecordBatchWriter,
443    dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
444    #[expect(dead_code)]
445    dl_table: DeltaTable,
446}
447
448impl DeltaLakeSinkWriter {
449    pub async fn new(
450        config: DeltaLakeConfig,
451        schema: Schema,
452        pk_indices: Vec<usize>,
453    ) -> Result<Self> {
454        let dl_table = config.common.create_deltalake_client().await?;
455        let writer = RecordBatchWriter::for_table(&dl_table)?;
456        let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
457            Arc::new(convert_schema(dl_table.get_schema()?)?);
458
459        Ok(Self {
460            config,
461            schema,
462            pk_indices,
463            writer,
464            dl_schema,
465            dl_table,
466        })
467    }
468
469    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
470        let a = DeltaLakeConvert
471            .to_record_batch(self.dl_schema.clone(), &chunk)
472            .context("convert record batch error")
473            .map_err(SinkError::DeltaLake)?;
474        self.writer.write(a).await?;
475        Ok(())
476    }
477}
478
479fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
480    let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
481    for field in schema.fields() {
482        let arrow_field_type = deltalake::arrow::datatypes::DataType::try_from(field.data_type())
483            .with_context(|| {
484                format!(
485                    "Failed to convert DeltaLake data type {:?} to Arrow data type for field '{}'",
486                    field.data_type(),
487                    field.name()
488                )
489            })
490            .map_err(SinkError::DeltaLake)?;
491        let dl_field = deltalake::arrow::datatypes::Field::new(
492            field.name(),
493            arrow_field_type,
494            field.is_nullable(),
495        );
496        builder.push(dl_field);
497    }
498    Ok(builder.finish())
499}
500
501#[async_trait]
502impl SinkWriter for DeltaLakeSinkWriter {
503    type CommitMetadata = Option<SinkMetadata>;
504
505    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
506        self.write(chunk).await
507    }
508
509    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
510        Ok(())
511    }
512
513    async fn abort(&mut self) -> Result<()> {
514        Ok(())
515    }
516
517    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
518        if !is_checkpoint {
519            return Ok(None);
520        }
521
522        let adds = self.writer.flush().await?;
523        Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
524            adds,
525        })?))
526    }
527}
528
529pub struct DeltaLakeSinkCommitter {
530    table: DeltaTable,
531}
532
533#[async_trait::async_trait]
534impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
535    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
536        tracing::info!("DeltaLake commit coordinator inited.");
537        Ok(None)
538    }
539
540    async fn commit(
541        &mut self,
542        epoch: u64,
543        metadata: Vec<SinkMetadata>,
544        add_columns: Option<Vec<Field>>,
545    ) -> Result<()> {
546        tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
547        if let Some(add_columns) = add_columns {
548            return Err(anyhow!(
549                "Delta lake sink not support add columns, but got: {:?}",
550                add_columns
551            )
552            .into());
553        }
554
555        let deltalake_write_result = metadata
556            .iter()
557            .map(DeltaLakeWriteResult::try_from)
558            .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
559        let write_adds: Vec<Action> = deltalake_write_result
560            .into_iter()
561            .flat_map(|v| v.adds.into_iter())
562            .map(Action::Add)
563            .collect();
564
565        if write_adds.is_empty() {
566            return Ok(());
567        }
568        let partition_cols = self.table.metadata()?.partition_columns.clone();
569        let partition_by = if !partition_cols.is_empty() {
570            Some(partition_cols)
571        } else {
572            None
573        };
574        let operation = DeltaOperation::Write {
575            mode: SaveMode::Append,
576            partition_by,
577            predicate: None,
578        };
579        let version = CommitBuilder::default()
580            .with_actions(write_adds)
581            .build(
582                Some(self.table.snapshot()?),
583                self.table.log_store().clone(),
584                operation,
585            )
586            .await?
587            .version();
588        self.table.update().await?;
589        tracing::info!(
590            "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
591        );
592        Ok(())
593    }
594}
595
596#[derive(Serialize, Deserialize)]
597struct DeltaLakeWriteResult {
598    adds: Vec<Add>,
599}
600
601impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
602    type Error = SinkError;
603
604    fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
605        let metadata =
606            serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
607        Ok(SinkMetadata {
608            metadata: Some(Serialized(SerializedMetadata { metadata })),
609        })
610    }
611}
612
613impl DeltaLakeWriteResult {
614    fn try_from(value: &SinkMetadata) -> Result<Self> {
615        if let Some(Serialized(v)) = &value.metadata {
616            let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
617                .context("Can't deserialize deltalake sink metadata")?;
618            Ok(DeltaLakeWriteResult { adds })
619        } else {
620            bail!("Can't create deltalake sink write result from empty data!")
621        }
622    }
623}
624
625impl From<::deltalake::DeltaTableError> for SinkError {
626    fn from(value: ::deltalake::DeltaTableError) -> Self {
627        SinkError::DeltaLake(anyhow!(value))
628    }
629}
630
631#[cfg(all(test, not(madsim)))]
632mod tests {
633    use deltalake::kernel::DataType as SchemaDataType;
634    use deltalake::operations::create::CreateBuilder;
635    use maplit::btreemap;
636    use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
637    use risingwave_common::catalog::{Field, Schema};
638    use risingwave_common::types::DataType;
639
640    use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
641    use crate::sink::SinkCommitCoordinator;
642    use crate::sink::writer::SinkWriter;
643
644    #[tokio::test]
645    async fn test_deltalake() {
646        let dir = tempfile::tempdir().unwrap();
647        let path = dir.path().to_str().unwrap();
648        CreateBuilder::new()
649            .with_location(path)
650            .with_column(
651                "id",
652                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
653                false,
654                Default::default(),
655            )
656            .with_column(
657                "name",
658                SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
659                false,
660                Default::default(),
661            )
662            .await
663            .unwrap();
664
665        let properties = btreemap! {
666            "connector".to_owned() => "deltalake".to_owned(),
667            "force_append_only".to_owned() => "true".to_owned(),
668            "type".to_owned() => "append-only".to_owned(),
669            "location".to_owned() => format!("file://{}", path),
670        };
671
672        let schema = Schema::new(vec![
673            Field {
674                data_type: DataType::Int32,
675                name: "id".into(),
676            },
677            Field {
678                data_type: DataType::Varchar,
679                name: "name".into(),
680            },
681        ]);
682
683        let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
684        let deltalake_table = deltalake_config
685            .common
686            .create_deltalake_client()
687            .await
688            .unwrap();
689
690        let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
691            .await
692            .unwrap();
693        let chunk = StreamChunk::new(
694            vec![Op::Insert, Op::Insert, Op::Insert],
695            vec![
696                I32Array::from_iter(vec![1, 2, 3]).into_ref(),
697                Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
698            ],
699        );
700        deltalake_writer.write(chunk).await.unwrap();
701        let mut committer = DeltaLakeSinkCommitter {
702            table: deltalake_table,
703        };
704        let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
705        committer.commit(1, vec![metadata], None).await.unwrap();
706
707        // The following code is to test reading the deltalake data table written with test data.
708        // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }`
709        // to the `dev-dependencies` section of the `Cargo.toml` file of this crate.
710        //
711        // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake`
712        // will increase the compile time and output binary size in release build, even though it is a
713        // dev dependency.
714
715        let ctx = deltalake::datafusion::prelude::SessionContext::new();
716        let table = deltalake::open_table(path).await.unwrap();
717        ctx.register_table("demo", std::sync::Arc::new(table))
718            .unwrap();
719
720        let batches = ctx
721            .sql("SELECT * FROM demo")
722            .await
723            .unwrap()
724            .collect()
725            .await
726            .unwrap();
727        assert_eq!(3, batches.get(0).unwrap().column(0).len());
728        assert_eq!(3, batches.get(0).unwrap().column(1).len());
729    }
730}