1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::datafusion::datasource::TableProvider;
27use deltalake::kernel::transaction::CommitBuilder;
28use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
29use deltalake::protocol::{DeltaOperation, SaveMode};
30use deltalake::writer::{DeltaWriter, RecordBatchWriter};
31use url::Url;
32use phf::{Set, phf_set};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::array::arrow::DeltaLakeConvert;
35use risingwave_common::bail;
36use risingwave_common::catalog::{Schema};
37use risingwave_common::types::DataType;
38use risingwave_common::util::iter_util::ZipEqDebug;
39use risingwave_pb::connector_service::SinkMetadata;
40use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
41use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
42use risingwave_pb::stream_plan::PbSinkSchemaChange;
43use serde::{Deserialize, Serialize};
44use serde_with::{DisplayFromStr, serde_as};
45use tokio::sync::mpsc::UnboundedSender;
46use with_options::WithOptions;
47
48use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
49use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
50use crate::sink::coordinate::CoordinatedLogSinker;
51use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
52use crate::sink::writer::SinkWriter;
53use crate::sink::{
54 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55 SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
56 SinkWriterParam,
57};
58
59pub const DEFAULT_REGION: &str = "us-east-1";
60pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
61
62pub const DELTALAKE_SINK: &str = "deltalake";
63
64#[serde_as]
65#[derive(Deserialize, Debug, Clone, WithOptions)]
66pub struct DeltaLakeCommon {
67 #[serde(rename = "location")]
68 pub location: String,
69 #[serde(flatten)]
70 pub aws_auth_props: AwsAuthProps,
71
72 #[serde(rename = "gcs.service.account")]
73 pub gcs_service_account: Option<String>,
74 #[serde(default = "default_commit_checkpoint_interval")]
76 #[serde_as(as = "DisplayFromStr")]
77 #[with_option(allow_alter_on_fly)]
78 pub commit_checkpoint_interval: u64,
79}
80
81impl EnforceSecret for DeltaLakeCommon {
82 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
83 "gcs.service.account",
84 };
85
86 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
87 AwsAuthProps::enforce_one(prop)?;
88 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
89 return Err(EnforceSecretError {
90 key: prop.to_owned(),
91 }
92 .into());
93 }
94
95 Ok(())
96 }
97}
98
99impl DeltaLakeCommon {
100 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
101 let table = match Self::get_table_url(&self.location)? {
102 DeltaTableUrl::S3(s3_path) => {
103 let storage_options = self.build_delta_lake_config_for_aws().await?;
104 deltalake::aws::register_handlers(None);
105 let url = Url::parse(&s3_path)
106 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
107 deltalake::open_table_with_storage_options(url, storage_options).await?
108 }
109 DeltaTableUrl::Local(local_path) => {
110 let url = Url::parse(&format!("file://{}", local_path))
111 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
112 deltalake::open_table(url).await?
113 }
114 DeltaTableUrl::Gcs(gcs_path) => {
115 let mut storage_options = HashMap::new();
116 storage_options.insert(
117 GCS_SERVICE_ACCOUNT.to_owned(),
118 self.gcs_service_account.clone().ok_or_else(|| {
119 SinkError::Config(anyhow!(
120 "gcs.service.account is required with Google Cloud Storage (GCS)"
121 ))
122 })?,
123 );
124 deltalake::gcp::register_handlers(None);
125 let url = Url::parse(&gcs_path)
126 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
127 deltalake::open_table_with_storage_options(url, storage_options)
128 .await?
129 }
130 };
131 Ok(table)
132 }
133
134 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
135 if path.starts_with("s3://") || path.starts_with("s3a://") {
136 Ok(DeltaTableUrl::S3(path.to_owned()))
137 } else if path.starts_with("gs://") {
138 Ok(DeltaTableUrl::Gcs(path.to_owned()))
139 } else if let Some(path) = path.strip_prefix("file://") {
140 Ok(DeltaTableUrl::Local(path.to_owned()))
141 } else {
142 Err(SinkError::DeltaLake(anyhow!(
143 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
144 )))
145 }
146 }
147
148 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
149 let mut storage_options = HashMap::new();
150 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
151 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
152 let sdk_config = self.aws_auth_props.build_config().await?;
153 let credentials = sdk_config
154 .credentials_provider()
155 .ok_or_else(|| {
156 SinkError::Config(anyhow!(
157 "s3.access.key and s3.secret.key is required with aws s3"
158 ))
159 })?
160 .as_ref()
161 .provide_credentials()
162 .await
163 .map_err(|e| SinkError::Config(e.into()))?;
164 let region = sdk_config.region();
165 let endpoint = sdk_config.endpoint_url();
166 storage_options.insert(
167 AWS_ACCESS_KEY_ID.to_owned(),
168 credentials.access_key_id().to_owned(),
169 );
170 storage_options.insert(
171 AWS_SECRET_ACCESS_KEY.to_owned(),
172 credentials.secret_access_key().to_owned(),
173 );
174 if endpoint.is_none() && region.is_none() {
175 return Err(SinkError::Config(anyhow!(
176 "s3.endpoint and s3.region need to be filled with at least one"
177 )));
178 }
179 storage_options.insert(
180 AWS_REGION.to_owned(),
181 region
182 .map(|r| r.as_ref().to_owned())
183 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
184 );
185 if let Some(s3_endpoint) = endpoint {
186 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
187 }
188 Ok(storage_options)
189 }
190}
191
192enum DeltaTableUrl {
193 S3(String),
194 Local(String),
195 Gcs(String),
196}
197
198#[serde_as]
199#[derive(Clone, Debug, Deserialize, WithOptions)]
200pub struct DeltaLakeConfig {
201 #[serde(flatten)]
202 pub common: DeltaLakeCommon,
203
204 pub r#type: String,
205}
206
207impl EnforceSecret for DeltaLakeConfig {
208 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
209 DeltaLakeCommon::enforce_one(prop)
210 }
211}
212
213impl DeltaLakeConfig {
214 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
215 let config = serde_json::from_value::<DeltaLakeConfig>(
216 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
217 )
218 .map_err(|e| SinkError::Config(anyhow!(e)))?;
219 Ok(config)
220 }
221}
222
223#[derive(Debug)]
224pub struct DeltaLakeSink {
225 pub config: DeltaLakeConfig,
226 param: SinkParam,
227}
228
229impl EnforceSecret for DeltaLakeSink {
230 fn enforce_secret<'a>(
231 prop_iter: impl Iterator<Item = &'a str>,
232 ) -> crate::error::ConnectorResult<()> {
233 for prop in prop_iter {
234 DeltaLakeCommon::enforce_one(prop)?;
235 }
236 Ok(())
237 }
238}
239
240impl DeltaLakeSink {
241 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
242 Ok(Self { config, param })
243 }
244}
245
246fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
247 let result = match rw_data_type {
248 DataType::Boolean => {
249 matches!(
250 dl_data_type,
251 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
252 )
253 }
254 DataType::Int16 => {
255 matches!(
256 dl_data_type,
257 DeltaLakeDataType::Primitive(PrimitiveType::Short)
258 )
259 }
260 DataType::Int32 => {
261 matches!(
262 dl_data_type,
263 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
264 )
265 }
266 DataType::Int64 => {
267 matches!(
268 dl_data_type,
269 DeltaLakeDataType::Primitive(PrimitiveType::Long)
270 )
271 }
272 DataType::Float32 => {
273 matches!(
274 dl_data_type,
275 DeltaLakeDataType::Primitive(PrimitiveType::Float)
276 )
277 }
278 DataType::Float64 => {
279 matches!(
280 dl_data_type,
281 DeltaLakeDataType::Primitive(PrimitiveType::Double)
282 )
283 }
284 DataType::Decimal => {
285 matches!(
286 dl_data_type,
287 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
288 )
289 }
290 DataType::Date => {
291 matches!(
292 dl_data_type,
293 DeltaLakeDataType::Primitive(PrimitiveType::Date)
294 )
295 }
296 DataType::Varchar => {
297 matches!(
298 dl_data_type,
299 DeltaLakeDataType::Primitive(PrimitiveType::String)
300 )
301 }
302 DataType::Timestamptz => {
303 matches!(
304 dl_data_type,
305 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
306 )
307 }
308 DataType::Struct(rw_struct) => {
309 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
310 let mut result = true;
311 for ((rw_name, rw_type), dl_field) in
312 rw_struct.iter().zip_eq_debug(dl_struct.fields())
313 {
314 result = check_field_type(rw_type, dl_field.data_type())?
315 && result
316 && rw_name.eq(dl_field.name());
317 }
318 result
319 } else {
320 false
321 }
322 }
323 DataType::List(rw_list) => {
324 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
325 check_field_type(rw_list.elem(), dl_list.element_type())?
326 } else {
327 false
328 }
329 }
330 _ => {
331 return Err(SinkError::DeltaLake(anyhow!(
332 "Type {:?} is not supported for DeltaLake sink.",
333 rw_data_type.to_owned()
334 )));
335 }
336 };
337 Ok(result)
338}
339
340impl Sink for DeltaLakeSink {
341 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
342
343 const SINK_NAME: &'static str = DELTALAKE_SINK;
344
345 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
346 let inner = DeltaLakeSinkWriter::new(
347 self.config.clone(),
348 self.param.schema().clone(),
349 self.param.downstream_pk_or_empty(),
350 )
351 .await?;
352
353 let commit_checkpoint_interval =
354 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
355 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
356 );
357
358 let writer = CoordinatedLogSinker::new(
359 &writer_param,
360 self.param.clone(),
361 inner,
362 commit_checkpoint_interval,
363 )
364 .await?;
365
366 Ok(writer)
367 }
368
369 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
370 DeltaLakeConfig::from_btreemap(config.clone())?;
371 Ok(())
372 }
373
374 async fn validate(&self) -> Result<()> {
375 if self.config.r#type != SINK_TYPE_APPEND_ONLY
376 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
377 {
378 return Err(SinkError::Config(anyhow!(
379 "only append-only delta lake sink is supported",
380 )));
381 }
382 let table = self.config.common.create_deltalake_client().await?;
383 let snapshot = table.snapshot()?;
384 let delta_schema = snapshot.schema();
385 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
386 .fields()
387 .map(|f| (f.name(), f.data_type()))
388 .collect();
389 if deltalake_fields.len() != self.param.schema().fields().len() {
390 return Err(SinkError::DeltaLake(anyhow!(
391 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
392 self.param.schema().fields().len(),
393 deltalake_fields.len()
394 )));
395 }
396 for field in self.param.schema().fields() {
397 if !deltalake_fields.contains_key(&field.name) {
398 return Err(SinkError::DeltaLake(anyhow!(
399 "column {} not found in deltalake table",
400 field.name
401 )));
402 }
403 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
404 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
405 })?;
406 if !check_field_type(&field.data_type, deltalake_field_type)? {
407 return Err(SinkError::DeltaLake(anyhow!(
408 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
409 field.name,
410 deltalake_field_type,
411 field.data_type
412 )));
413 }
414 }
415 if self.config.common.commit_checkpoint_interval == 0 {
416 return Err(SinkError::Config(anyhow!(
417 "`commit_checkpoint_interval` must be greater than 0"
418 )));
419 }
420 Ok(())
421 }
422
423 fn is_coordinated_sink(&self) -> bool {
424 true
425 }
426
427 async fn new_coordinator(
428 &self,
429 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
430 ) -> Result<SinkCommitCoordinator> {
431 let coordinator = DeltaLakeSinkCommitter {
432 table: self.config.common.create_deltalake_client().await?,
433 };
434 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
435 }
436}
437
438impl TryFrom<SinkParam> for DeltaLakeSink {
439 type Error = SinkError;
440
441 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
442 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
443 DeltaLakeSink::new(config, param)
444 }
445}
446
447pub struct DeltaLakeSinkWriter {
448 pub config: DeltaLakeConfig,
449 #[expect(dead_code)]
450 schema: Schema,
451 #[expect(dead_code)]
452 pk_indices: Vec<usize>,
453 writer: RecordBatchWriter,
454 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
455 #[expect(dead_code)]
456 dl_table: DeltaTable,
457}
458
459impl DeltaLakeSinkWriter {
460 pub async fn new(
461 config: DeltaLakeConfig,
462 schema: Schema,
463 pk_indices: Vec<usize>,
464 ) -> Result<Self> {
465 let dl_table = config.common.create_deltalake_client().await?;
466 let writer = RecordBatchWriter::for_table(&dl_table)?;
467 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> = dl_table.schema();
469
470 Ok(Self {
471 config,
472 schema,
473 pk_indices,
474 writer,
475 dl_schema,
476 dl_table,
477 })
478 }
479
480 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
481 let a = DeltaLakeConvert
482 .to_record_batch(self.dl_schema.clone(), &chunk)
483 .context("convert record batch error")
484 .map_err(SinkError::DeltaLake)?;
485 self.writer.write(a).await?;
486 Ok(())
487 }
488}
489
490
491#[async_trait]
492impl SinkWriter for DeltaLakeSinkWriter {
493 type CommitMetadata = Option<SinkMetadata>;
494
495 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
496 self.write(chunk).await
497 }
498
499 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
500 Ok(())
501 }
502
503 async fn abort(&mut self) -> Result<()> {
504 Ok(())
505 }
506
507 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
508 if !is_checkpoint {
509 return Ok(None);
510 }
511
512 let adds = self.writer.flush().await?;
513 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
514 adds,
515 })?))
516 }
517}
518
519pub struct DeltaLakeSinkCommitter {
520 table: DeltaTable,
521}
522
523#[async_trait::async_trait]
524impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
525 async fn init(&mut self) -> Result<()> {
526 tracing::info!("DeltaLake commit coordinator inited.");
527 Ok(())
528 }
529
530 async fn commit(
531 &mut self,
532 epoch: u64,
533 metadata: Vec<SinkMetadata>,
534 schema_change: Option<PbSinkSchemaChange>,
535 ) -> Result<()> {
536 tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
537 if let Some(schema_change) = schema_change {
538 return Err(anyhow!(
539 "Delta lake sink does not support schema changes, but got: {:?}",
540 schema_change
541 )
542 .into());
543 }
544
545 let deltalake_write_result = metadata
546 .iter()
547 .map(DeltaLakeWriteResult::try_from)
548 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
549 let write_adds: Vec<Action> = deltalake_write_result
550 .into_iter()
551 .flat_map(|v| v.adds.into_iter())
552 .map(Action::Add)
553 .collect();
554
555 if write_adds.is_empty() {
556 return Ok(());
557 }
558 let partition_cols = self.table.snapshot()?.metadata().partition_columns().clone();
559 let partition_by = if !partition_cols.is_empty() {
560 Some(partition_cols)
561 } else {
562 None
563 };
564 let operation = DeltaOperation::Write {
565 mode: SaveMode::Append,
566 partition_by,
567 predicate: None,
568 };
569 let version = CommitBuilder::default()
570 .with_actions(write_adds)
571 .build(
572 Some(self.table.snapshot()?),
573 self.table.log_store().clone(),
574 operation,
575 )
576 .await?
577 .version();
578 self.table.update().await?;
579 tracing::info!(
580 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
581 );
582 Ok(())
583 }
584}
585
586#[derive(Serialize, Deserialize)]
587struct DeltaLakeWriteResult {
588 adds: Vec<Add>,
589}
590
591impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
592 type Error = SinkError;
593
594 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
595 let metadata =
596 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
597 Ok(SinkMetadata {
598 metadata: Some(Serialized(SerializedMetadata { metadata })),
599 })
600 }
601}
602
603impl DeltaLakeWriteResult {
604 fn try_from(value: &SinkMetadata) -> Result<Self> {
605 if let Some(Serialized(v)) = &value.metadata {
606 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
607 .context("Can't deserialize deltalake sink metadata")?;
608 Ok(DeltaLakeWriteResult { adds })
609 } else {
610 bail!("Can't create deltalake sink write result from empty data!")
611 }
612 }
613}
614
615impl From<::deltalake::DeltaTableError> for SinkError {
616 fn from(value: ::deltalake::DeltaTableError) -> Self {
617 SinkError::DeltaLake(anyhow!(value))
618 }
619}
620
621#[cfg(all(test, not(madsim)))]
622mod tests {
623 use deltalake::kernel::DataType as SchemaDataType;
624 use deltalake::operations::create::CreateBuilder;
625 use maplit::btreemap;
626 use url::Url;
627 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
628 use risingwave_common::catalog::{Field, Schema};
629 use risingwave_common::types::DataType;
630
631 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
632 use crate::sink::SinglePhaseCommitCoordinator;
633 use crate::sink::writer::SinkWriter;
634
635 #[tokio::test]
636 async fn test_deltalake() {
637 let dir = tempfile::tempdir().unwrap();
638 let path = dir.path().to_str().unwrap();
639 CreateBuilder::new()
640 .with_location(path)
641 .with_column(
642 "id",
643 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
644 false,
645 Default::default(),
646 )
647 .with_column(
648 "name",
649 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
650 false,
651 Default::default(),
652 )
653 .await
654 .unwrap();
655
656 let properties = btreemap! {
657 "connector".to_owned() => "deltalake".to_owned(),
658 "force_append_only".to_owned() => "true".to_owned(),
659 "type".to_owned() => "append-only".to_owned(),
660 "location".to_owned() => format!("file://{}", path),
661 };
662
663 let schema = Schema::new(vec![
664 Field {
665 data_type: DataType::Int32,
666 name: "id".into(),
667 },
668 Field {
669 data_type: DataType::Varchar,
670 name: "name".into(),
671 },
672 ]);
673
674 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
675 let deltalake_table = deltalake_config
676 .common
677 .create_deltalake_client()
678 .await
679 .unwrap();
680
681 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
682 .await
683 .unwrap();
684 let chunk = StreamChunk::new(
685 vec![Op::Insert, Op::Insert, Op::Insert],
686 vec![
687 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
688 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
689 ],
690 );
691 deltalake_writer.write(chunk).await.unwrap();
692 let mut committer = DeltaLakeSinkCommitter {
693 table: deltalake_table,
694 };
695 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
696 committer.commit(1, vec![metadata], None).await.unwrap();
697
698 let ctx = deltalake::datafusion::prelude::SessionContext::new();
707 let table = deltalake::open_table(Url::parse(&format!("file://{}", path)).unwrap()).await.unwrap();
708 ctx.register_table("demo", std::sync::Arc::new(table))
709 .unwrap();
710
711 let batches = ctx
712 .sql("SELECT * FROM demo")
713 .await
714 .unwrap()
715 .collect()
716 .await
717 .unwrap();
718 assert_eq!(3, batches.get(0).unwrap().column(0).len());
719 assert_eq!(3, batches.get(0).unwrap().column(1).len());
720 }
721}