1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::{Field, Schema};
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use tokio::sync::mpsc::UnboundedSender;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
53 SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
54};
55
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59pub const DELTALAKE_SINK: &str = "deltalake";
60
61#[serde_as]
62#[derive(Deserialize, Debug, Clone, WithOptions)]
63pub struct DeltaLakeCommon {
64 #[serde(rename = "location")]
65 pub location: String,
66 #[serde(flatten)]
67 pub aws_auth_props: AwsAuthProps,
68
69 #[serde(rename = "gcs.service.account")]
70 pub gcs_service_account: Option<String>,
71 #[serde(default = "default_commit_checkpoint_interval")]
73 #[serde_as(as = "DisplayFromStr")]
74 #[with_option(allow_alter_on_fly)]
75 pub commit_checkpoint_interval: u64,
76}
77
78impl EnforceSecret for DeltaLakeCommon {
79 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
80 "gcs.service.account",
81 };
82
83 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
84 AwsAuthProps::enforce_one(prop)?;
85 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
86 return Err(EnforceSecretError {
87 key: prop.to_owned(),
88 }
89 .into());
90 }
91
92 Ok(())
93 }
94}
95
96impl DeltaLakeCommon {
97 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
98 let table = match Self::get_table_url(&self.location)? {
99 DeltaTableUrl::S3(s3_path) => {
100 let storage_options = self.build_delta_lake_config_for_aws().await?;
101 deltalake::aws::register_handlers(None);
102 deltalake::open_table_with_storage_options(&s3_path, storage_options).await?
103 }
104 DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
105 DeltaTableUrl::Gcs(gcs_path) => {
106 let mut storage_options = HashMap::new();
107 storage_options.insert(
108 GCS_SERVICE_ACCOUNT.to_owned(),
109 self.gcs_service_account.clone().ok_or_else(|| {
110 SinkError::Config(anyhow!(
111 "gcs.service.account is required with Google Cloud Storage (GCS)"
112 ))
113 })?,
114 );
115 deltalake::gcp::register_handlers(None);
116 deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
117 .await?
118 }
119 };
120 Ok(table)
121 }
122
123 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
124 if path.starts_with("s3://") || path.starts_with("s3a://") {
125 Ok(DeltaTableUrl::S3(path.to_owned()))
126 } else if path.starts_with("gs://") {
127 Ok(DeltaTableUrl::Gcs(path.to_owned()))
128 } else if let Some(path) = path.strip_prefix("file://") {
129 Ok(DeltaTableUrl::Local(path.to_owned()))
130 } else {
131 Err(SinkError::DeltaLake(anyhow!(
132 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
133 )))
134 }
135 }
136
137 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
138 let mut storage_options = HashMap::new();
139 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
140 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
141 let sdk_config = self.aws_auth_props.build_config().await?;
142 let credentials = sdk_config
143 .credentials_provider()
144 .ok_or_else(|| {
145 SinkError::Config(anyhow!(
146 "s3.access.key and s3.secret.key is required with aws s3"
147 ))
148 })?
149 .as_ref()
150 .provide_credentials()
151 .await
152 .map_err(|e| SinkError::Config(e.into()))?;
153 let region = sdk_config.region();
154 let endpoint = sdk_config.endpoint_url();
155 storage_options.insert(
156 AWS_ACCESS_KEY_ID.to_owned(),
157 credentials.access_key_id().to_owned(),
158 );
159 storage_options.insert(
160 AWS_SECRET_ACCESS_KEY.to_owned(),
161 credentials.secret_access_key().to_owned(),
162 );
163 if endpoint.is_none() && region.is_none() {
164 return Err(SinkError::Config(anyhow!(
165 "s3.endpoint and s3.region need to be filled with at least one"
166 )));
167 }
168 storage_options.insert(
169 AWS_REGION.to_owned(),
170 region
171 .map(|r| r.as_ref().to_owned())
172 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
173 );
174 if let Some(s3_endpoint) = endpoint {
175 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
176 }
177 Ok(storage_options)
178 }
179}
180
181enum DeltaTableUrl {
182 S3(String),
183 Local(String),
184 Gcs(String),
185}
186
187#[serde_as]
188#[derive(Clone, Debug, Deserialize, WithOptions)]
189pub struct DeltaLakeConfig {
190 #[serde(flatten)]
191 pub common: DeltaLakeCommon,
192
193 pub r#type: String,
194}
195
196impl EnforceSecret for DeltaLakeConfig {
197 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
198 DeltaLakeCommon::enforce_one(prop)
199 }
200}
201
202impl DeltaLakeConfig {
203 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
204 let config = serde_json::from_value::<DeltaLakeConfig>(
205 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
206 )
207 .map_err(|e| SinkError::Config(anyhow!(e)))?;
208 Ok(config)
209 }
210}
211
212#[derive(Debug)]
213pub struct DeltaLakeSink {
214 pub config: DeltaLakeConfig,
215 param: SinkParam,
216}
217
218impl EnforceSecret for DeltaLakeSink {
219 fn enforce_secret<'a>(
220 prop_iter: impl Iterator<Item = &'a str>,
221 ) -> crate::error::ConnectorResult<()> {
222 for prop in prop_iter {
223 DeltaLakeCommon::enforce_one(prop)?;
224 }
225 Ok(())
226 }
227}
228
229impl DeltaLakeSink {
230 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
231 Ok(Self { config, param })
232 }
233}
234
235fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
236 let result = match rw_data_type {
237 DataType::Boolean => {
238 matches!(
239 dl_data_type,
240 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
241 )
242 }
243 DataType::Int16 => {
244 matches!(
245 dl_data_type,
246 DeltaLakeDataType::Primitive(PrimitiveType::Short)
247 )
248 }
249 DataType::Int32 => {
250 matches!(
251 dl_data_type,
252 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
253 )
254 }
255 DataType::Int64 => {
256 matches!(
257 dl_data_type,
258 DeltaLakeDataType::Primitive(PrimitiveType::Long)
259 )
260 }
261 DataType::Float32 => {
262 matches!(
263 dl_data_type,
264 DeltaLakeDataType::Primitive(PrimitiveType::Float)
265 )
266 }
267 DataType::Float64 => {
268 matches!(
269 dl_data_type,
270 DeltaLakeDataType::Primitive(PrimitiveType::Double)
271 )
272 }
273 DataType::Decimal => {
274 matches!(
275 dl_data_type,
276 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
277 )
278 }
279 DataType::Date => {
280 matches!(
281 dl_data_type,
282 DeltaLakeDataType::Primitive(PrimitiveType::Date)
283 )
284 }
285 DataType::Varchar => {
286 matches!(
287 dl_data_type,
288 DeltaLakeDataType::Primitive(PrimitiveType::String)
289 )
290 }
291 DataType::Timestamptz => {
292 matches!(
293 dl_data_type,
294 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
295 )
296 }
297 DataType::Struct(rw_struct) => {
298 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
299 let mut result = true;
300 for ((rw_name, rw_type), dl_field) in
301 rw_struct.iter().zip_eq_debug(dl_struct.fields())
302 {
303 result = check_field_type(rw_type, dl_field.data_type())?
304 && result
305 && rw_name.eq(dl_field.name());
306 }
307 result
308 } else {
309 false
310 }
311 }
312 DataType::List(rw_list) => {
313 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
314 check_field_type(rw_list.elem(), dl_list.element_type())?
315 } else {
316 false
317 }
318 }
319 _ => {
320 return Err(SinkError::DeltaLake(anyhow!(
321 "Type {:?} is not supported for DeltaLake sink.",
322 rw_data_type.to_owned()
323 )));
324 }
325 };
326 Ok(result)
327}
328
329impl Sink for DeltaLakeSink {
330 type Coordinator = DeltaLakeSinkCommitter;
331 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
332
333 const SINK_NAME: &'static str = DELTALAKE_SINK;
334
335 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
336 let inner = DeltaLakeSinkWriter::new(
337 self.config.clone(),
338 self.param.schema().clone(),
339 self.param.downstream_pk_or_empty(),
340 )
341 .await?;
342
343 let commit_checkpoint_interval =
344 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
345 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
346 );
347
348 let writer = CoordinatedLogSinker::new(
349 &writer_param,
350 self.param.clone(),
351 inner,
352 commit_checkpoint_interval,
353 )
354 .await?;
355
356 Ok(writer)
357 }
358
359 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
360 DeltaLakeConfig::from_btreemap(config.clone())?;
361 Ok(())
362 }
363
364 async fn validate(&self) -> Result<()> {
365 if self.config.r#type != SINK_TYPE_APPEND_ONLY
366 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
367 {
368 return Err(SinkError::Config(anyhow!(
369 "only append-only delta lake sink is supported",
370 )));
371 }
372 let table = self.config.common.create_deltalake_client().await?;
373 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
374 .get_schema()?
375 .fields()
376 .map(|f| (f.name(), f.data_type()))
377 .collect();
378 if deltalake_fields.len() != self.param.schema().fields().len() {
379 return Err(SinkError::DeltaLake(anyhow!(
380 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
381 self.param.schema().fields().len(),
382 deltalake_fields.len()
383 )));
384 }
385 for field in self.param.schema().fields() {
386 if !deltalake_fields.contains_key(&field.name) {
387 return Err(SinkError::DeltaLake(anyhow!(
388 "column {} not found in deltalake table",
389 field.name
390 )));
391 }
392 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
393 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
394 })?;
395 if !check_field_type(&field.data_type, deltalake_field_type)? {
396 return Err(SinkError::DeltaLake(anyhow!(
397 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
398 field.name,
399 deltalake_field_type,
400 field.data_type
401 )));
402 }
403 }
404 if self.config.common.commit_checkpoint_interval == 0 {
405 return Err(SinkError::Config(anyhow!(
406 "`commit_checkpoint_interval` must be greater than 0"
407 )));
408 }
409 Ok(())
410 }
411
412 fn is_coordinated_sink(&self) -> bool {
413 true
414 }
415
416 async fn new_coordinator(
417 &self,
418 _db: DatabaseConnection,
419 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
420 ) -> Result<Self::Coordinator> {
421 Ok(DeltaLakeSinkCommitter {
422 table: self.config.common.create_deltalake_client().await?,
423 })
424 }
425}
426
427impl TryFrom<SinkParam> for DeltaLakeSink {
428 type Error = SinkError;
429
430 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
431 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
432 DeltaLakeSink::new(config, param)
433 }
434}
435
436pub struct DeltaLakeSinkWriter {
437 pub config: DeltaLakeConfig,
438 #[expect(dead_code)]
439 schema: Schema,
440 #[expect(dead_code)]
441 pk_indices: Vec<usize>,
442 writer: RecordBatchWriter,
443 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
444 #[expect(dead_code)]
445 dl_table: DeltaTable,
446}
447
448impl DeltaLakeSinkWriter {
449 pub async fn new(
450 config: DeltaLakeConfig,
451 schema: Schema,
452 pk_indices: Vec<usize>,
453 ) -> Result<Self> {
454 let dl_table = config.common.create_deltalake_client().await?;
455 let writer = RecordBatchWriter::for_table(&dl_table)?;
456 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
457 Arc::new(convert_schema(dl_table.get_schema()?)?);
458
459 Ok(Self {
460 config,
461 schema,
462 pk_indices,
463 writer,
464 dl_schema,
465 dl_table,
466 })
467 }
468
469 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
470 let a = DeltaLakeConvert
471 .to_record_batch(self.dl_schema.clone(), &chunk)
472 .context("convert record batch error")
473 .map_err(SinkError::DeltaLake)?;
474 self.writer.write(a).await?;
475 Ok(())
476 }
477}
478
479fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
480 let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
481 for field in schema.fields() {
482 let arrow_field_type = deltalake::arrow::datatypes::DataType::try_from(field.data_type())
483 .with_context(|| {
484 format!(
485 "Failed to convert DeltaLake data type {:?} to Arrow data type for field '{}'",
486 field.data_type(),
487 field.name()
488 )
489 })
490 .map_err(SinkError::DeltaLake)?;
491 let dl_field = deltalake::arrow::datatypes::Field::new(
492 field.name(),
493 arrow_field_type,
494 field.is_nullable(),
495 );
496 builder.push(dl_field);
497 }
498 Ok(builder.finish())
499}
500
501#[async_trait]
502impl SinkWriter for DeltaLakeSinkWriter {
503 type CommitMetadata = Option<SinkMetadata>;
504
505 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
506 self.write(chunk).await
507 }
508
509 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
510 Ok(())
511 }
512
513 async fn abort(&mut self) -> Result<()> {
514 Ok(())
515 }
516
517 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
518 if !is_checkpoint {
519 return Ok(None);
520 }
521
522 let adds = self.writer.flush().await?;
523 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
524 adds,
525 })?))
526 }
527}
528
529pub struct DeltaLakeSinkCommitter {
530 table: DeltaTable,
531}
532
533#[async_trait::async_trait]
534impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
535 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
536 tracing::info!("DeltaLake commit coordinator inited.");
537 Ok(None)
538 }
539
540 async fn commit(
541 &mut self,
542 epoch: u64,
543 metadata: Vec<SinkMetadata>,
544 add_columns: Option<Vec<Field>>,
545 ) -> Result<()> {
546 tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
547 if let Some(add_columns) = add_columns {
548 return Err(anyhow!(
549 "Delta lake sink not support add columns, but got: {:?}",
550 add_columns
551 )
552 .into());
553 }
554
555 let deltalake_write_result = metadata
556 .iter()
557 .map(DeltaLakeWriteResult::try_from)
558 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
559 let write_adds: Vec<Action> = deltalake_write_result
560 .into_iter()
561 .flat_map(|v| v.adds.into_iter())
562 .map(Action::Add)
563 .collect();
564
565 if write_adds.is_empty() {
566 return Ok(());
567 }
568 let partition_cols = self.table.metadata()?.partition_columns.clone();
569 let partition_by = if !partition_cols.is_empty() {
570 Some(partition_cols)
571 } else {
572 None
573 };
574 let operation = DeltaOperation::Write {
575 mode: SaveMode::Append,
576 partition_by,
577 predicate: None,
578 };
579 let version = CommitBuilder::default()
580 .with_actions(write_adds)
581 .build(
582 Some(self.table.snapshot()?),
583 self.table.log_store().clone(),
584 operation,
585 )
586 .await?
587 .version();
588 self.table.update().await?;
589 tracing::info!(
590 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
591 );
592 Ok(())
593 }
594}
595
596#[derive(Serialize, Deserialize)]
597struct DeltaLakeWriteResult {
598 adds: Vec<Add>,
599}
600
601impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
602 type Error = SinkError;
603
604 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
605 let metadata =
606 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
607 Ok(SinkMetadata {
608 metadata: Some(Serialized(SerializedMetadata { metadata })),
609 })
610 }
611}
612
613impl DeltaLakeWriteResult {
614 fn try_from(value: &SinkMetadata) -> Result<Self> {
615 if let Some(Serialized(v)) = &value.metadata {
616 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
617 .context("Can't deserialize deltalake sink metadata")?;
618 Ok(DeltaLakeWriteResult { adds })
619 } else {
620 bail!("Can't create deltalake sink write result from empty data!")
621 }
622 }
623}
624
625impl From<::deltalake::DeltaTableError> for SinkError {
626 fn from(value: ::deltalake::DeltaTableError) -> Self {
627 SinkError::DeltaLake(anyhow!(value))
628 }
629}
630
631#[cfg(all(test, not(madsim)))]
632mod tests {
633 use deltalake::kernel::DataType as SchemaDataType;
634 use deltalake::operations::create::CreateBuilder;
635 use maplit::btreemap;
636 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
637 use risingwave_common::catalog::{Field, Schema};
638 use risingwave_common::types::DataType;
639
640 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
641 use crate::sink::SinkCommitCoordinator;
642 use crate::sink::writer::SinkWriter;
643
644 #[tokio::test]
645 async fn test_deltalake() {
646 let dir = tempfile::tempdir().unwrap();
647 let path = dir.path().to_str().unwrap();
648 CreateBuilder::new()
649 .with_location(path)
650 .with_column(
651 "id",
652 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
653 false,
654 Default::default(),
655 )
656 .with_column(
657 "name",
658 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
659 false,
660 Default::default(),
661 )
662 .await
663 .unwrap();
664
665 let properties = btreemap! {
666 "connector".to_owned() => "deltalake".to_owned(),
667 "force_append_only".to_owned() => "true".to_owned(),
668 "type".to_owned() => "append-only".to_owned(),
669 "location".to_owned() => format!("file://{}", path),
670 };
671
672 let schema = Schema::new(vec![
673 Field {
674 data_type: DataType::Int32,
675 name: "id".into(),
676 },
677 Field {
678 data_type: DataType::Varchar,
679 name: "name".into(),
680 },
681 ]);
682
683 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
684 let deltalake_table = deltalake_config
685 .common
686 .create_deltalake_client()
687 .await
688 .unwrap();
689
690 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
691 .await
692 .unwrap();
693 let chunk = StreamChunk::new(
694 vec![Op::Insert, Op::Insert, Op::Insert],
695 vec![
696 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
697 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
698 ],
699 );
700 deltalake_writer.write(chunk).await.unwrap();
701 let mut committer = DeltaLakeSinkCommitter {
702 table: deltalake_table,
703 };
704 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
705 committer.commit(1, vec![metadata], None).await.unwrap();
706
707 let ctx = deltalake::datafusion::prelude::SessionContext::new();
716 let table = deltalake::open_table(path).await.unwrap();
717 ctx.register_table("demo", std::sync::Arc::new(table))
718 .unwrap();
719
720 let batches = ctx
721 .sql("SELECT * FROM demo")
722 .await
723 .unwrap()
724 .collect()
725 .await
726 .unwrap();
727 assert_eq!(3, batches.get(0).unwrap().column(0).len());
728 assert_eq!(3, batches.get(0).unwrap().column(1).len());
729 }
730}