1#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60pub trait ToArrow {
65 fn to_record_batch(
69 &self,
70 schema: arrow_schema::SchemaRef,
71 chunk: &DataChunk,
72 ) -> Result<arrow_array::RecordBatch, ArrayError> {
73 if !chunk.is_compacted() {
75 let c = chunk.clone();
76 return self.to_record_batch(schema, &c.compact());
77 }
78
79 let columns: Vec<_> = chunk
81 .columns()
82 .iter()
83 .zip_eq_fast(schema.fields().iter())
84 .map(|(column, field)| self.to_array(field.data_type(), column))
85 .try_collect()?;
86
87 let opts =
89 arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90 arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91 .map_err(ArrayError::to_arrow)
92 }
93
94 fn to_array(
96 &self,
97 data_type: &arrow_schema::DataType,
98 array: &ArrayImpl,
99 ) -> Result<arrow_array::ArrayRef, ArrayError> {
100 let arrow_array = match array {
101 ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102 ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103 ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104 ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105 ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106 ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107 ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108 ArrayImpl::Date(array) => self.date_to_arrow(array),
109 ArrayImpl::Time(array) => self.time_to_arrow(array),
110 ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111 ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112 ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113 ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114 ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115 ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116 ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117 ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118 ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119 ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120 ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121 ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122 }?;
123 if arrow_array.data_type() != data_type {
124 arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125 } else {
126 Ok(arrow_array)
127 }
128 }
129
130 #[inline]
131 fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132 Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133 }
134
135 #[inline]
136 fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137 Ok(Arc::new(arrow_array::Int16Array::from(array)))
138 }
139
140 #[inline]
141 fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142 Ok(Arc::new(arrow_array::Int32Array::from(array)))
143 }
144
145 #[inline]
146 fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147 Ok(Arc::new(arrow_array::Int64Array::from(array)))
148 }
149
150 #[inline]
151 fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152 Ok(Arc::new(arrow_array::Float32Array::from(array)))
153 }
154
155 #[inline]
156 fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157 Ok(Arc::new(arrow_array::Float64Array::from(array)))
158 }
159
160 #[inline]
161 fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162 Ok(Arc::new(arrow_array::StringArray::from(array)))
163 }
164
165 #[inline]
166 fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167 Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168 }
169
170 #[inline]
171 fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172 Ok(Arc::new(arrow_array::Date32Array::from(array)))
173 }
174
175 #[inline]
176 fn timestamp_to_arrow(
177 &self,
178 array: &TimestampArray,
179 ) -> Result<arrow_array::ArrayRef, ArrayError> {
180 Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181 array,
182 )))
183 }
184
185 #[inline]
186 fn timestamptz_to_arrow(
187 &self,
188 array: &TimestamptzArray,
189 ) -> Result<arrow_array::ArrayRef, ArrayError> {
190 Ok(Arc::new(
191 arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192 ))
193 }
194
195 #[inline]
196 fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197 Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198 }
199
200 #[inline]
201 fn interval_to_arrow(
202 &self,
203 array: &IntervalArray,
204 ) -> Result<arrow_array::ArrayRef, ArrayError> {
205 Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206 array,
207 )))
208 }
209
210 #[inline]
211 fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212 Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213 }
214
215 #[inline]
217 fn decimal_to_arrow(
218 &self,
219 _data_type: &arrow_schema::DataType,
220 array: &DecimalArray,
221 ) -> Result<arrow_array::ArrayRef, ArrayError> {
222 Ok(Arc::new(arrow_array::StringArray::from(array)))
223 }
224
225 #[inline]
227 fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228 Ok(Arc::new(arrow_array::StringArray::from(array)))
229 }
230
231 #[inline]
232 fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233 Ok(Arc::new(arrow_array::Int64Array::from(array)))
234 }
235
236 #[inline]
237 fn list_to_arrow(
238 &self,
239 data_type: &arrow_schema::DataType,
240 array: &ListArray,
241 ) -> Result<arrow_array::ArrayRef, ArrayError> {
242 let arrow_schema::DataType::List(field) = data_type else {
243 return Err(ArrayError::to_arrow("Invalid list type"));
244 };
245 let values = self.to_array(field.data_type(), array.values())?;
246 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248 Ok(Arc::new(arrow_array::ListArray::new(
249 field.clone(),
250 offsets,
251 values,
252 nulls,
253 )))
254 }
255
256 #[inline]
257 fn vector_to_arrow(
258 &self,
259 data_type: &arrow_schema::DataType,
260 array: &VectorArray,
261 ) -> Result<arrow_array::ArrayRef, ArrayError> {
262 let arrow_schema::DataType::List(field) = data_type else {
263 return Err(ArrayError::to_arrow("Invalid list type"));
264 };
265 if field.data_type() != &arrow_schema::DataType::Float32 {
266 return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267 }
268 let values = Arc::new(arrow_array::Float32Array::from(
269 array.as_raw_slice().to_vec(),
270 ));
271 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273 Ok(Arc::new(arrow_array::ListArray::new(
274 field.clone(),
275 offsets,
276 values,
277 nulls,
278 )))
279 }
280
281 #[inline]
282 fn struct_to_arrow(
283 &self,
284 data_type: &arrow_schema::DataType,
285 array: &StructArray,
286 ) -> Result<arrow_array::ArrayRef, ArrayError> {
287 let arrow_schema::DataType::Struct(fields) = data_type else {
288 return Err(ArrayError::to_arrow("Invalid struct type"));
289 };
290 Ok(Arc::new(arrow_array::StructArray::new(
291 fields.clone(),
292 array
293 .fields()
294 .zip_eq_fast(fields)
295 .map(|(arr, field)| self.to_array(field.data_type(), arr))
296 .try_collect::<_, _, ArrayError>()?,
297 Some(array.null_bitmap().into()),
298 )))
299 }
300
301 #[inline]
302 fn map_to_arrow(
303 &self,
304 data_type: &arrow_schema::DataType,
305 array: &MapArray,
306 ) -> Result<arrow_array::ArrayRef, ArrayError> {
307 let arrow_schema::DataType::Map(field, ordered) = data_type else {
308 return Err(ArrayError::to_arrow("Invalid map type"));
309 };
310 if *ordered {
311 return Err(ArrayError::to_arrow("Sorted map is not supported"));
312 }
313 let values = self
314 .struct_to_arrow(field.data_type(), array.as_struct())?
315 .as_struct()
316 .clone();
317 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
318 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
319 Ok(Arc::new(arrow_array::MapArray::new(
320 field.clone(),
321 offsets,
322 values,
323 nulls,
324 *ordered,
325 )))
326 }
327
328 fn to_arrow_field(
333 &self,
334 name: &str,
335 value: &DataType,
336 ) -> Result<arrow_schema::Field, ArrayError> {
337 let data_type = match value {
338 DataType::Boolean => self.bool_type_to_arrow(),
340 DataType::Int16 => self.int16_type_to_arrow(),
341 DataType::Int32 => self.int32_type_to_arrow(),
342 DataType::Int64 => self.int64_type_to_arrow(),
343 DataType::Int256 => self.int256_type_to_arrow(),
344 DataType::Float32 => self.float32_type_to_arrow(),
345 DataType::Float64 => self.float64_type_to_arrow(),
346 DataType::Date => self.date_type_to_arrow(),
347 DataType::Time => self.time_type_to_arrow(),
348 DataType::Timestamp => self.timestamp_type_to_arrow(),
349 DataType::Timestamptz => self.timestamptz_type_to_arrow(),
350 DataType::Interval => self.interval_type_to_arrow(),
351 DataType::Varchar => self.varchar_type_to_arrow(),
352 DataType::Bytea => self.bytea_type_to_arrow(),
353 DataType::Serial => self.serial_type_to_arrow(),
354 DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
355 DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
356 DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
357 DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
358 DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
359 DataType::Vector(_) => self.list_type_to_arrow(&VECTOR_ITEM_TYPE)?,
360 };
361 Ok(arrow_schema::Field::new(name, data_type, true))
362 }
363
364 #[inline]
365 fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
366 arrow_schema::DataType::Boolean
367 }
368
369 #[inline]
370 fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
371 arrow_schema::DataType::Int16
372 }
373
374 #[inline]
375 fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
376 arrow_schema::DataType::Int32
377 }
378
379 #[inline]
380 fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
381 arrow_schema::DataType::Int64
382 }
383
384 #[inline]
385 fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
386 arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
387 }
388
389 #[inline]
390 fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
391 arrow_schema::DataType::Float32
392 }
393
394 #[inline]
395 fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
396 arrow_schema::DataType::Float64
397 }
398
399 #[inline]
400 fn date_type_to_arrow(&self) -> arrow_schema::DataType {
401 arrow_schema::DataType::Date32
402 }
403
404 #[inline]
405 fn time_type_to_arrow(&self) -> arrow_schema::DataType {
406 arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
407 }
408
409 #[inline]
410 fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
411 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
412 }
413
414 #[inline]
415 fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
416 arrow_schema::DataType::Timestamp(
417 arrow_schema::TimeUnit::Microsecond,
418 Some("+00:00".into()),
419 )
420 }
421
422 #[inline]
423 fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
424 arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
425 }
426
427 #[inline]
428 fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
429 arrow_schema::DataType::Utf8
430 }
431
432 #[inline]
433 fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
434 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
435 .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
436 }
437
438 #[inline]
439 fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
440 arrow_schema::DataType::Binary
441 }
442
443 #[inline]
444 fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
445 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
446 .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
447 }
448
449 #[inline]
450 fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
451 arrow_schema::DataType::Int64
452 }
453
454 #[inline]
455 fn list_type_to_arrow(
456 &self,
457 elem_type: &DataType,
458 ) -> Result<arrow_schema::DataType, ArrayError> {
459 Ok(arrow_schema::DataType::List(Arc::new(
460 self.to_arrow_field("item", elem_type)?,
461 )))
462 }
463
464 #[inline]
465 fn struct_type_to_arrow(
466 &self,
467 fields: &StructType,
468 ) -> Result<arrow_schema::DataType, ArrayError> {
469 Ok(arrow_schema::DataType::Struct(
470 fields
471 .iter()
472 .map(|(name, ty)| self.to_arrow_field(name, ty))
473 .try_collect::<_, _, ArrayError>()?,
474 ))
475 }
476
477 #[inline]
478 fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
479 let sorted = false;
480 let key = self
482 .to_arrow_field("key", map_type.key())?
483 .with_nullable(false);
484 let value = self.to_arrow_field("value", map_type.value())?;
485 Ok(arrow_schema::DataType::Map(
486 Arc::new(arrow_schema::Field::new(
487 "entries",
488 arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
489 false,
491 )),
492 sorted,
493 ))
494 }
495}
496
497#[allow(clippy::wrong_self_convention)]
499pub trait FromArrow {
500 fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
502 let mut columns = Vec::with_capacity(batch.num_columns());
503 for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
504 let column = Arc::new(self.from_array(field, array)?);
505 columns.push(column);
506 }
507 Ok(DataChunk::new(columns, batch.num_rows()))
508 }
509
510 fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
512 Ok(StructType::new(
513 fields
514 .iter()
515 .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
516 .try_collect::<_, Vec<_>, ArrayError>()?,
517 ))
518 }
519
520 fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
522 use arrow_schema::DataType::*;
523 use arrow_schema::IntervalUnit::*;
524 use arrow_schema::TimeUnit::*;
525
526 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
528 return self.from_extension_type(type_name, field.data_type());
529 }
530
531 Ok(match field.data_type() {
532 Boolean => DataType::Boolean,
533 Int16 => DataType::Int16,
534 Int32 => DataType::Int32,
535 Int64 => DataType::Int64,
536 Int8 => DataType::Int16,
537 UInt8 => DataType::Int16,
538 UInt16 => DataType::Int32,
539 UInt32 => DataType::Int64,
540 UInt64 => DataType::Decimal,
541 Float16 => DataType::Float32,
542 Float32 => DataType::Float32,
543 Float64 => DataType::Float64,
544 Decimal128(_, _) => DataType::Decimal,
545 Decimal256(_, _) => DataType::Int256,
546 Date32 => DataType::Date,
547 Time64(Microsecond) => DataType::Time,
548 Timestamp(Microsecond, None) => DataType::Timestamp,
549 Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
550 Timestamp(Second, None) => DataType::Timestamp,
551 Timestamp(Second, Some(_)) => DataType::Timestamptz,
552 Timestamp(Millisecond, None) => DataType::Timestamp,
553 Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
554 Timestamp(Nanosecond, None) => DataType::Timestamp,
555 Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
556 Interval(MonthDayNano) => DataType::Interval,
557 Utf8 => DataType::Varchar,
558 Binary => DataType::Bytea,
559 LargeUtf8 => self.from_large_utf8()?,
560 LargeBinary => self.from_large_binary()?,
561 List(field) => DataType::List(Box::new(self.from_field(field)?)),
562 Struct(fields) => DataType::Struct(self.from_fields(fields)?),
563 Map(field, _is_sorted) => {
564 let entries = self.from_field(field)?;
565 DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
566 ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
567 })?)
568 }
569 t => {
570 return Err(ArrayError::from_arrow(format!(
571 "unsupported arrow data type: {t:?}"
572 )));
573 }
574 })
575 }
576
577 fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
579 Ok(DataType::Varchar)
580 }
581
582 fn from_large_binary(&self) -> Result<DataType, ArrayError> {
584 Ok(DataType::Bytea)
585 }
586
587 fn from_extension_type(
589 &self,
590 type_name: &str,
591 physical_type: &arrow_schema::DataType,
592 ) -> Result<DataType, ArrayError> {
593 match (type_name, physical_type) {
594 ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
595 ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
596 _ => Err(ArrayError::from_arrow(format!(
597 "unsupported extension type: {type_name:?}"
598 ))),
599 }
600 }
601
602 fn from_array(
604 &self,
605 field: &arrow_schema::Field,
606 array: &arrow_array::ArrayRef,
607 ) -> Result<ArrayImpl, ArrayError> {
608 use arrow_schema::DataType::*;
609 use arrow_schema::IntervalUnit::*;
610 use arrow_schema::TimeUnit::*;
611
612 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
614 return self.from_extension_array(type_name, array);
615 }
616 match array.data_type() {
617 Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
618 Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
619 Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
620 Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
621 Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
622 UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
623 UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
624 UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
625
626 UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
627 Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
628 Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
629 Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
630 Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
631 Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
632 Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
633 Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
634 Timestamp(Second, None) => {
635 self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
636 }
637 Timestamp(Second, Some(_)) => {
638 self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
639 }
640 Timestamp(Millisecond, None) => {
641 self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
642 }
643 Timestamp(Millisecond, Some(_)) => {
644 self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
645 }
646 Timestamp(Microsecond, None) => {
647 self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
648 }
649 Timestamp(Microsecond, Some(_)) => {
650 self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
651 }
652 Timestamp(Nanosecond, None) => {
653 self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
654 }
655 Timestamp(Nanosecond, Some(_)) => {
656 self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
657 }
658 Interval(MonthDayNano) => {
659 self.from_interval_array(array.as_any().downcast_ref().unwrap())
660 }
661 Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
662 Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
663 LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
664 LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
665 List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
666 Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
667 Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
668 t => Err(ArrayError::from_arrow(format!(
669 "unsupported arrow data type: {t:?}",
670 ))),
671 }
672 }
673
674 fn from_extension_array(
676 &self,
677 type_name: &str,
678 array: &arrow_array::ArrayRef,
679 ) -> Result<ArrayImpl, ArrayError> {
680 match type_name {
681 "arrowudf.decimal" => {
682 let array: &arrow_array::StringArray =
683 array.as_any().downcast_ref().ok_or_else(|| {
684 ArrayError::from_arrow(
685 "expected string array for `arrowudf.decimal`".to_owned(),
686 )
687 })?;
688 Ok(ArrayImpl::Decimal(array.try_into()?))
689 }
690 "arrowudf.json" => {
691 let array: &arrow_array::StringArray =
692 array.as_any().downcast_ref().ok_or_else(|| {
693 ArrayError::from_arrow(
694 "expected string array for `arrowudf.json`".to_owned(),
695 )
696 })?;
697 Ok(ArrayImpl::Jsonb(array.try_into()?))
698 }
699 _ => Err(ArrayError::from_arrow(format!(
700 "unsupported extension type: {type_name:?}"
701 ))),
702 }
703 }
704
705 fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
706 Ok(ArrayImpl::Bool(array.into()))
707 }
708
709 fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
710 Ok(ArrayImpl::Int16(array.into()))
711 }
712
713 fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
714 Ok(ArrayImpl::Int16(array.into()))
715 }
716
717 fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
718 Ok(ArrayImpl::Int16(array.into()))
719 }
720
721 fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
722 Ok(ArrayImpl::Int32(array.into()))
723 }
724
725 fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
726 Ok(ArrayImpl::Int64(array.into()))
727 }
728
729 fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
730 Ok(ArrayImpl::Int32(array.into()))
731 }
732
733 fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
734 Ok(ArrayImpl::Int64(array.into()))
735 }
736
737 fn from_int256_array(
738 &self,
739 array: &arrow_array::Decimal256Array,
740 ) -> Result<ArrayImpl, ArrayError> {
741 Ok(ArrayImpl::Int256(array.into()))
742 }
743
744 fn from_decimal128_array(
745 &self,
746 array: &arrow_array::Decimal128Array,
747 ) -> Result<ArrayImpl, ArrayError> {
748 Ok(ArrayImpl::Decimal(array.try_into()?))
749 }
750
751 fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
752 Ok(ArrayImpl::Decimal(array.try_into()?))
753 }
754
755 fn from_float16_array(
756 &self,
757 array: &arrow_array::Float16Array,
758 ) -> Result<ArrayImpl, ArrayError> {
759 Ok(ArrayImpl::Float32(array.try_into()?))
760 }
761
762 fn from_float32_array(
763 &self,
764 array: &arrow_array::Float32Array,
765 ) -> Result<ArrayImpl, ArrayError> {
766 Ok(ArrayImpl::Float32(array.into()))
767 }
768
769 fn from_float64_array(
770 &self,
771 array: &arrow_array::Float64Array,
772 ) -> Result<ArrayImpl, ArrayError> {
773 Ok(ArrayImpl::Float64(array.into()))
774 }
775
776 fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
777 Ok(ArrayImpl::Date(array.into()))
778 }
779
780 fn from_time64us_array(
781 &self,
782 array: &arrow_array::Time64MicrosecondArray,
783 ) -> Result<ArrayImpl, ArrayError> {
784 Ok(ArrayImpl::Time(array.into()))
785 }
786
787 fn from_timestampsecond_array(
788 &self,
789 array: &arrow_array::TimestampSecondArray,
790 ) -> Result<ArrayImpl, ArrayError> {
791 Ok(ArrayImpl::Timestamp(array.into()))
792 }
793 fn from_timestampsecond_some_array(
794 &self,
795 array: &arrow_array::TimestampSecondArray,
796 ) -> Result<ArrayImpl, ArrayError> {
797 Ok(ArrayImpl::Timestamptz(array.into()))
798 }
799
800 fn from_timestampms_array(
801 &self,
802 array: &arrow_array::TimestampMillisecondArray,
803 ) -> Result<ArrayImpl, ArrayError> {
804 Ok(ArrayImpl::Timestamp(array.into()))
805 }
806
807 fn from_timestampms_some_array(
808 &self,
809 array: &arrow_array::TimestampMillisecondArray,
810 ) -> Result<ArrayImpl, ArrayError> {
811 Ok(ArrayImpl::Timestamptz(array.into()))
812 }
813
814 fn from_timestampus_array(
815 &self,
816 array: &arrow_array::TimestampMicrosecondArray,
817 ) -> Result<ArrayImpl, ArrayError> {
818 Ok(ArrayImpl::Timestamp(array.into()))
819 }
820
821 fn from_timestampus_some_array(
822 &self,
823 array: &arrow_array::TimestampMicrosecondArray,
824 ) -> Result<ArrayImpl, ArrayError> {
825 Ok(ArrayImpl::Timestamptz(array.into()))
826 }
827
828 fn from_timestampns_array(
829 &self,
830 array: &arrow_array::TimestampNanosecondArray,
831 ) -> Result<ArrayImpl, ArrayError> {
832 Ok(ArrayImpl::Timestamp(array.into()))
833 }
834
835 fn from_timestampns_some_array(
836 &self,
837 array: &arrow_array::TimestampNanosecondArray,
838 ) -> Result<ArrayImpl, ArrayError> {
839 Ok(ArrayImpl::Timestamptz(array.into()))
840 }
841
842 fn from_interval_array(
843 &self,
844 array: &arrow_array::IntervalMonthDayNanoArray,
845 ) -> Result<ArrayImpl, ArrayError> {
846 Ok(ArrayImpl::Interval(array.into()))
847 }
848
849 fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
850 Ok(ArrayImpl::Utf8(array.into()))
851 }
852
853 fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
854 Ok(ArrayImpl::Bytea(array.into()))
855 }
856
857 fn from_large_utf8_array(
858 &self,
859 array: &arrow_array::LargeStringArray,
860 ) -> Result<ArrayImpl, ArrayError> {
861 Ok(ArrayImpl::Utf8(array.into()))
862 }
863
864 fn from_large_binary_array(
865 &self,
866 array: &arrow_array::LargeBinaryArray,
867 ) -> Result<ArrayImpl, ArrayError> {
868 Ok(ArrayImpl::Bytea(array.into()))
869 }
870
871 fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
872 use arrow_array::Array;
873 let arrow_schema::DataType::List(field) = array.data_type() else {
874 panic!("nested field types cannot be determined.");
875 };
876 Ok(ArrayImpl::List(ListArray {
877 value: Box::new(self.from_array(field, array.values())?),
878 bitmap: match array.nulls() {
879 Some(nulls) => nulls.iter().collect(),
880 None => Bitmap::ones(array.len()),
881 },
882 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
883 }))
884 }
885
886 fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
887 use arrow_array::Array;
888 let arrow_schema::DataType::Struct(fields) = array.data_type() else {
889 panic!("nested field types cannot be determined.");
890 };
891 Ok(ArrayImpl::Struct(StructArray::new(
892 self.from_fields(fields)?,
893 array
894 .columns()
895 .iter()
896 .zip_eq_fast(fields)
897 .map(|(array, field)| self.from_array(field, array).map(Arc::new))
898 .try_collect()?,
899 (0..array.len()).map(|i| array.is_valid(i)).collect(),
900 )))
901 }
902
903 fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
904 use arrow_array::Array;
905 let struct_array = self.from_struct_array(array.entries())?;
906 let list_array = ListArray {
907 value: Box::new(struct_array),
908 bitmap: match array.nulls() {
909 Some(nulls) => nulls.iter().collect(),
910 None => Bitmap::ones(array.len()),
911 },
912 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
913 };
914
915 Ok(ArrayImpl::Map(MapArray { inner: list_array }))
916 }
917}
918
919impl From<&Bitmap> for arrow_buffer::NullBuffer {
920 fn from(bitmap: &Bitmap) -> Self {
921 bitmap.iter().collect()
922 }
923}
924
925macro_rules! converts {
927 ($ArrayType:ty, $ArrowType:ty) => {
928 impl From<&$ArrayType> for $ArrowType {
929 fn from(array: &$ArrayType) -> Self {
930 array.iter().collect()
931 }
932 }
933 impl From<&$ArrowType> for $ArrayType {
934 fn from(array: &$ArrowType) -> Self {
935 array.iter().collect()
936 }
937 }
938 impl From<&[$ArrowType]> for $ArrayType {
939 fn from(arrays: &[$ArrowType]) -> Self {
940 arrays.iter().flat_map(|a| a.iter()).collect()
941 }
942 }
943 };
944 ($ArrayType:ty, $ArrowType:ty, @map) => {
946 impl From<&$ArrayType> for $ArrowType {
947 fn from(array: &$ArrayType) -> Self {
948 array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
949 }
950 }
951 impl From<&$ArrowType> for $ArrayType {
952 fn from(array: &$ArrowType) -> Self {
953 array
954 .iter()
955 .map(|o| {
956 o.map(|v| {
957 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
958 })
959 })
960 .collect()
961 }
962 }
963 impl From<&[$ArrowType]> for $ArrayType {
964 fn from(arrays: &[$ArrowType]) -> Self {
965 arrays
966 .iter()
967 .flat_map(|a| a.iter())
968 .map(|o| {
969 o.map(|v| {
970 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
971 })
972 })
973 .collect()
974 }
975 }
976 };
977}
978
979macro_rules! converts_with_type {
981 ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
982 impl From<&$ArrayType> for $ArrowType {
983 fn from(array: &$ArrayType) -> Self {
984 let values: Vec<Option<$ToType>> =
985 array.iter().map(|x| x.map(|v| v as $ToType)).collect();
986 <$ArrowType>::from_iter(values)
987 }
988 }
989
990 impl From<&$ArrowType> for $ArrayType {
991 fn from(array: &$ArrowType) -> Self {
992 let values: Vec<Option<$FromType>> =
993 array.iter().map(|x| x.map(|v| v as $FromType)).collect();
994 <$ArrayType>::from_iter(values)
995 }
996 }
997
998 impl From<&[$ArrowType]> for $ArrayType {
999 fn from(arrays: &[$ArrowType]) -> Self {
1000 let values: Vec<Option<$FromType>> = arrays
1001 .iter()
1002 .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1003 .collect();
1004 <$ArrayType>::from_iter(values)
1005 }
1006 }
1007 };
1008}
1009
1010macro_rules! converts_with_timeunit {
1011 ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1012
1013 impl From<&$ArrayType> for $ArrowType {
1014 fn from(array: &$ArrayType) -> Self {
1015 array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1016 }
1017 }
1018
1019 impl From<&$ArrowType> for $ArrayType {
1020 fn from(array: &$ArrowType) -> Self {
1021 array.iter().map(|o| {
1022 o.map(|v| {
1023 let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1024 timestamp
1025 })
1026 }).collect()
1027 }
1028 }
1029
1030 impl From<&[$ArrowType]> for $ArrayType {
1031 fn from(arrays: &[$ArrowType]) -> Self {
1032 arrays
1033 .iter()
1034 .flat_map(|a| a.iter())
1035 .map(|o| {
1036 o.map(|v| {
1037 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1038 })
1039 })
1040 .collect()
1041 }
1042 }
1043
1044 };
1045}
1046
1047converts!(BoolArray, arrow_array::BooleanArray);
1048converts!(I16Array, arrow_array::Int16Array);
1049converts!(I32Array, arrow_array::Int32Array);
1050converts!(I64Array, arrow_array::Int64Array);
1051converts!(F32Array, arrow_array::Float32Array, @map);
1052converts!(F64Array, arrow_array::Float64Array, @map);
1053converts!(BytesArray, arrow_array::BinaryArray);
1054converts!(BytesArray, arrow_array::LargeBinaryArray);
1055converts!(Utf8Array, arrow_array::StringArray);
1056converts!(Utf8Array, arrow_array::LargeStringArray);
1057converts!(DateArray, arrow_array::Date32Array, @map);
1058converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1059converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1060converts!(SerialArray, arrow_array::Int64Array, @map);
1061
1062converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1063converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1064converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1065converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1066
1067converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1068converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1069converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1070converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1071
1072converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1073converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1074converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1075converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1076
1077trait FromIntoArrow {
1079 type ArrowType;
1081 fn from_arrow(value: Self::ArrowType) -> Self;
1082 fn into_arrow(self) -> Self::ArrowType;
1083}
1084
1085trait FromIntoArrowWithUnit {
1088 type ArrowType;
1089 type TimestampType;
1091 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1092 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1093}
1094
1095impl FromIntoArrow for Serial {
1096 type ArrowType = i64;
1097
1098 fn from_arrow(value: Self::ArrowType) -> Self {
1099 value.into()
1100 }
1101
1102 fn into_arrow(self) -> Self::ArrowType {
1103 self.into()
1104 }
1105}
1106
1107impl FromIntoArrow for F32 {
1108 type ArrowType = f32;
1109
1110 fn from_arrow(value: Self::ArrowType) -> Self {
1111 value.into()
1112 }
1113
1114 fn into_arrow(self) -> Self::ArrowType {
1115 self.into()
1116 }
1117}
1118
1119impl FromIntoArrow for F64 {
1120 type ArrowType = f64;
1121
1122 fn from_arrow(value: Self::ArrowType) -> Self {
1123 value.into()
1124 }
1125
1126 fn into_arrow(self) -> Self::ArrowType {
1127 self.into()
1128 }
1129}
1130
1131impl FromIntoArrow for Date {
1132 type ArrowType = i32;
1133
1134 fn from_arrow(value: Self::ArrowType) -> Self {
1135 Date(arrow_array::types::Date32Type::to_naive_date(value))
1136 }
1137
1138 fn into_arrow(self) -> Self::ArrowType {
1139 arrow_array::types::Date32Type::from_naive_date(self.0)
1140 }
1141}
1142
1143impl FromIntoArrow for Time {
1144 type ArrowType = i64;
1145
1146 fn from_arrow(value: Self::ArrowType) -> Self {
1147 Time(
1148 NaiveTime::from_num_seconds_from_midnight_opt(
1149 (value / 1_000_000) as _,
1150 (value % 1_000_000 * 1000) as _,
1151 )
1152 .unwrap(),
1153 )
1154 }
1155
1156 fn into_arrow(self) -> Self::ArrowType {
1157 self.0
1158 .signed_duration_since(NaiveTime::default())
1159 .num_microseconds()
1160 .unwrap()
1161 }
1162}
1163
1164impl FromIntoArrowWithUnit for Timestamp {
1165 type ArrowType = i64;
1166 type TimestampType = TimeUnit;
1167
1168 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1169 match time_unit {
1170 TimeUnit::Second => {
1171 Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1172 }
1173 TimeUnit::Millisecond => {
1174 Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1175 }
1176 TimeUnit::Microsecond => {
1177 Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1178 }
1179 TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1180 }
1181 }
1182
1183 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1184 match time_unit {
1185 TimeUnit::Second => self.0.and_utc().timestamp(),
1186 TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1187 TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1188 TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1189 }
1190 }
1191}
1192
1193impl FromIntoArrowWithUnit for Timestamptz {
1194 type ArrowType = i64;
1195 type TimestampType = TimeUnit;
1196
1197 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1198 match time_unit {
1199 TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1200 TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1201 TimeUnit::Microsecond => Timestamptz::from_micros(value),
1202 TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1203 }
1204 }
1205
1206 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1207 match time_unit {
1208 TimeUnit::Second => self.timestamp(),
1209 TimeUnit::Millisecond => self.timestamp_millis(),
1210 TimeUnit::Microsecond => self.timestamp_micros(),
1211 TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1212 }
1213 }
1214}
1215
1216impl FromIntoArrow for Interval {
1217 type ArrowType = ArrowIntervalType;
1218
1219 fn from_arrow(value: Self::ArrowType) -> Self {
1220 Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1221 }
1222
1223 fn into_arrow(self) -> Self::ArrowType {
1224 ArrowIntervalType {
1225 months: self.months(),
1226 days: self.days(),
1227 nanoseconds: self.usecs() * 1000,
1229 }
1230 }
1231}
1232
1233impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1234 fn from(array: &DecimalArray) -> Self {
1235 let mut builder =
1236 arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1237 for value in array.iter() {
1238 builder.append_option(value.map(|d| d.to_string()));
1239 }
1240 builder.finish()
1241 }
1242}
1243
1244impl From<&DecimalArray> for arrow_array::StringArray {
1245 fn from(array: &DecimalArray) -> Self {
1246 let mut builder =
1247 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1248 for value in array.iter() {
1249 builder.append_option(value.map(|d| d.to_string()));
1250 }
1251 builder.finish()
1252 }
1253}
1254
1255impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1257 type Error = ArrayError;
1258
1259 fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1260 if array.scale() < 0 {
1261 bail!("support negative scale for arrow decimal")
1262 }
1263 let from_arrow = |value| {
1264 const NAN: i128 = i128::MIN + 1;
1265 let res = match value {
1266 NAN => Decimal::NaN,
1267 i128::MAX => Decimal::PositiveInf,
1268 i128::MIN => Decimal::NegativeInf,
1269 _ => Decimal::Normalized(
1270 rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1271 .map_err(ArrayError::internal)?,
1272 ),
1273 };
1274 Ok(res)
1275 };
1276 array
1277 .iter()
1278 .map(|o| o.map(from_arrow).transpose())
1279 .collect::<Result<Self, Self::Error>>()
1280 }
1281}
1282
1283impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1285 type Error = ArrayError;
1286
1287 fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1288 let from_arrow = |value| {
1289 let res = Decimal::from(value);
1291 Ok(res)
1292 };
1293
1294 array
1296 .iter()
1297 .map(|o| o.map(from_arrow).transpose())
1298 .collect::<Result<Self, Self::Error>>()
1299 }
1300}
1301
1302impl TryFrom<&arrow_array::Float16Array> for F32Array {
1303 type Error = ArrayError;
1304
1305 fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1306 let from_arrow = |value| Ok(f32::from(value));
1307
1308 array
1309 .iter()
1310 .map(|o| o.map(from_arrow).transpose())
1311 .collect::<Result<Self, Self::Error>>()
1312 }
1313}
1314
1315impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1316 type Error = ArrayError;
1317
1318 fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1319 array
1320 .iter()
1321 .map(|o| {
1322 o.map(|s| {
1323 let s = std::str::from_utf8(s)
1324 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1325 s.parse()
1326 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1327 })
1328 .transpose()
1329 })
1330 .try_collect()
1331 }
1332}
1333
1334impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1335 type Error = ArrayError;
1336
1337 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1338 array
1339 .iter()
1340 .map(|o| {
1341 o.map(|s| {
1342 s.parse()
1343 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1344 })
1345 .transpose()
1346 })
1347 .try_collect()
1348 }
1349}
1350
1351impl From<&JsonbArray> for arrow_array::StringArray {
1352 fn from(array: &JsonbArray) -> Self {
1353 let mut builder =
1354 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1355 for value in array.iter() {
1356 match value {
1357 Some(jsonb) => {
1358 write!(&mut builder, "{}", jsonb).unwrap();
1359 builder.append_value("");
1360 }
1361 None => builder.append_null(),
1362 }
1363 }
1364 builder.finish()
1365 }
1366}
1367
1368impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1369 type Error = ArrayError;
1370
1371 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1372 array
1373 .iter()
1374 .map(|o| {
1375 o.map(|s| {
1376 s.parse()
1377 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1378 })
1379 .transpose()
1380 })
1381 .try_collect()
1382 }
1383}
1384
1385impl From<&IntervalArray> for arrow_array::StringArray {
1386 fn from(array: &IntervalArray) -> Self {
1387 let mut builder =
1388 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1389 for value in array.iter() {
1390 match value {
1391 Some(interval) => {
1392 write!(&mut builder, "{}", interval).unwrap();
1393 builder.append_value("");
1394 }
1395 None => builder.append_null(),
1396 }
1397 }
1398 builder.finish()
1399 }
1400}
1401
1402impl From<&JsonbArray> for arrow_array::LargeStringArray {
1403 fn from(array: &JsonbArray) -> Self {
1404 let mut builder =
1405 arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1406 for value in array.iter() {
1407 match value {
1408 Some(jsonb) => {
1409 write!(&mut builder, "{}", jsonb).unwrap();
1410 builder.append_value("");
1411 }
1412 None => builder.append_null(),
1413 }
1414 }
1415 builder.finish()
1416 }
1417}
1418
1419impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1420 type Error = ArrayError;
1421
1422 fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1423 array
1424 .iter()
1425 .map(|o| {
1426 o.map(|s| {
1427 s.parse()
1428 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1429 })
1430 .transpose()
1431 })
1432 .try_collect()
1433 }
1434}
1435
1436impl From<arrow_buffer::i256> for Int256 {
1437 fn from(value: arrow_buffer::i256) -> Self {
1438 let buffer = value.to_be_bytes();
1439 Int256::from_be_bytes(buffer)
1440 }
1441}
1442
1443impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1444 fn from(val: Int256Ref<'a>) -> Self {
1445 let buffer = val.to_be_bytes();
1446 arrow_buffer::i256::from_be_bytes(buffer)
1447 }
1448}
1449
1450impl From<&Int256Array> for arrow_array::Decimal256Array {
1451 fn from(array: &Int256Array) -> Self {
1452 array
1453 .iter()
1454 .map(|o| o.map(arrow_buffer::i256::from))
1455 .collect()
1456 }
1457}
1458
1459impl From<&arrow_array::Decimal256Array> for Int256Array {
1460 fn from(array: &arrow_array::Decimal256Array) -> Self {
1461 let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1462
1463 values
1464 .iter()
1465 .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1466 .collect()
1467 }
1468}
1469
1470pub fn is_parquet_schema_match_source_schema(
1486 arrow_data_type: &arrow_schema::DataType,
1487 rw_data_type: &crate::types::DataType,
1488) -> bool {
1489 use arrow_schema::DataType as ArrowType;
1490
1491 use crate::types::{DataType as RwType, MapType, StructType};
1492
1493 match (arrow_data_type, rw_data_type) {
1494 (ArrowType::Boolean, RwType::Boolean)
1496 | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1497 | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1498 | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1499 | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1500 | (ArrowType::Decimal256(_, _), RwType::Int256)
1501 | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1502 | (ArrowType::Float64, RwType::Float64)
1503 | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1504 | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1505 | (ArrowType::Date32, RwType::Date)
1506 | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1507 | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1508 | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1509 | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1510
1511 (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1514 if arrow_fields.len() != rw_struct.len() {
1515 return false;
1516 }
1517 for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1518 {
1519 if arrow_field.name() != rw_name {
1520 return false;
1521 }
1522 if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1523 return false;
1524 }
1525 }
1526 true
1527 }
1528 (ArrowType::List(arrow_field), RwType::List(rw_elem_ty)) => {
1531 is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_elem_ty)
1532 }
1533 (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1537 if let ArrowType::Struct(fields) = arrow_field.data_type() {
1538 if fields.len() != 2 {
1539 return false;
1540 }
1541 let key_field = &fields[0];
1542 let value_field = &fields[1];
1543 if key_field.name() != "key" || value_field.name() != "value" {
1544 return false;
1545 }
1546 let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1547 is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1548 && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1549 } else {
1550 false
1551 }
1552 }
1553 _ => false,
1555 }
1556}
1557#[cfg(test)]
1558mod tests {
1559
1560 use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1561
1562 use super::*;
1563 use crate::types::{DataType as RwType, MapType, StructType};
1564
1565 #[test]
1566 fn test_struct_schema_match() {
1567 let arrow_struct = ArrowType::Struct(
1570 vec![
1571 ArrowField::new("f1", ArrowType::Float64, true),
1572 ArrowField::new("f2", ArrowType::Utf8, true),
1573 ]
1574 .into(),
1575 );
1576 let rw_struct = RwType::Struct(StructType::new(vec![
1578 ("f1".to_owned(), RwType::Float64),
1579 ("f2".to_owned(), RwType::Varchar),
1580 ]));
1581 assert!(is_parquet_schema_match_source_schema(
1582 &arrow_struct,
1583 &rw_struct
1584 ));
1585
1586 let arrow_struct2 = ArrowType::Struct(
1588 vec![
1589 ArrowField::new("f1", ArrowType::Float64, true),
1590 ArrowField::new("f3", ArrowType::Utf8, true),
1591 ]
1592 .into(),
1593 );
1594 assert!(!is_parquet_schema_match_source_schema(
1595 &arrow_struct2,
1596 &rw_struct
1597 ));
1598 }
1599
1600 #[test]
1601 fn test_list_schema_match() {
1602 let arrow_list =
1604 ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1605 let rw_list = RwType::List(Box::new(RwType::Float64));
1607 assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1608
1609 let rw_list2 = RwType::List(Box::new(RwType::Int32));
1610 assert!(!is_parquet_schema_match_source_schema(
1611 &arrow_list,
1612 &rw_list2
1613 ));
1614 }
1615
1616 #[test]
1617 fn test_map_schema_match() {
1618 let arrow_map = ArrowType::Map(
1620 Arc::new(ArrowField::new(
1621 "entries",
1622 ArrowType::Struct(
1623 vec![
1624 ArrowField::new("key", ArrowType::Utf8, false),
1625 ArrowField::new("value", ArrowType::Int32, true),
1626 ]
1627 .into(),
1628 ),
1629 false,
1630 )),
1631 false,
1632 );
1633 let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1635 assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1636
1637 let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1639 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1640
1641 let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1643 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1644
1645 let arrow_map2 = ArrowType::Map(
1647 Arc::new(ArrowField::new(
1648 "entries",
1649 ArrowType::Struct(
1650 vec![
1651 ArrowField::new("k", ArrowType::Utf8, false),
1652 ArrowField::new("value", ArrowType::Int32, true),
1653 ]
1654 .into(),
1655 ),
1656 false,
1657 )),
1658 false,
1659 );
1660 assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1661 }
1662
1663 #[test]
1664 fn bool() {
1665 let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1666 let arrow = arrow_array::BooleanArray::from(&array);
1667 assert_eq!(BoolArray::from(&arrow), array);
1668 }
1669
1670 #[test]
1671 fn i16() {
1672 let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1673 let arrow = arrow_array::Int16Array::from(&array);
1674 assert_eq!(I16Array::from(&arrow), array);
1675 }
1676
1677 #[test]
1678 fn i32() {
1679 let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1680 let arrow = arrow_array::Int32Array::from(&array);
1681 assert_eq!(I32Array::from(&arrow), array);
1682 }
1683
1684 #[test]
1685 fn i64() {
1686 let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1687 let arrow = arrow_array::Int64Array::from(&array);
1688 assert_eq!(I64Array::from(&arrow), array);
1689 }
1690
1691 #[test]
1692 fn f32() {
1693 let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1694 let arrow = arrow_array::Float32Array::from(&array);
1695 assert_eq!(F32Array::from(&arrow), array);
1696 }
1697
1698 #[test]
1699 fn f64() {
1700 let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1701 let arrow = arrow_array::Float64Array::from(&array);
1702 assert_eq!(F64Array::from(&arrow), array);
1703 }
1704
1705 #[test]
1706 fn int8() {
1707 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1708 let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1709 let converted: PrimitiveArray<i16> = (&arr).into();
1710 assert_eq!(converted, array);
1711 }
1712
1713 #[test]
1714 fn uint8() {
1715 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1716 let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1717 let converted: PrimitiveArray<i16> = (&arr).into();
1718 assert_eq!(converted, array);
1719 }
1720
1721 #[test]
1722 fn uint16() {
1723 let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1724 let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1725 let converted: PrimitiveArray<i32> = (&arr).into();
1726 assert_eq!(converted, array);
1727 }
1728
1729 #[test]
1730 fn uint32() {
1731 let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1732 let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1733 let converted: PrimitiveArray<i64> = (&arr).into();
1734 assert_eq!(converted, array);
1735 }
1736
1737 #[test]
1738 fn uint64() {
1739 let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1740 None,
1741 Some(Decimal::Normalized("7".parse().unwrap())),
1742 Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1743 ]);
1744 let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1745 let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1746 assert_eq!(converted, array);
1747 }
1748
1749 #[test]
1750 fn date() {
1751 let array = DateArray::from_iter([
1752 None,
1753 Date::with_days_since_ce(12345).ok(),
1754 Date::with_days_since_ce(-12345).ok(),
1755 ]);
1756 let arrow = arrow_array::Date32Array::from(&array);
1757 assert_eq!(DateArray::from(&arrow), array);
1758 }
1759
1760 #[test]
1761 fn time() {
1762 let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1763 let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1764 assert_eq!(TimeArray::from(&arrow), array);
1765 }
1766
1767 #[test]
1768 fn timestamp() {
1769 let array =
1770 TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1771 let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1772 assert_eq!(TimestampArray::from(&arrow), array);
1773 }
1774
1775 #[test]
1776 fn interval() {
1777 let array = IntervalArray::from_iter([
1778 None,
1779 Some(Interval::from_month_day_usec(
1780 1_000_000,
1781 1_000,
1782 1_000_000_000,
1783 )),
1784 Some(Interval::from_month_day_usec(
1785 -1_000_000,
1786 -1_000,
1787 -1_000_000_000,
1788 )),
1789 ]);
1790 let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1791 assert_eq!(IntervalArray::from(&arrow), array);
1792 }
1793
1794 #[test]
1795 fn string() {
1796 let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1797 let arrow = arrow_array::StringArray::from(&array);
1798 assert_eq!(Utf8Array::from(&arrow), array);
1799 }
1800
1801 #[test]
1802 fn binary() {
1803 let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1804 let arrow = arrow_array::BinaryArray::from(&array);
1805 assert_eq!(BytesArray::from(&arrow), array);
1806 }
1807
1808 #[test]
1809 fn decimal() {
1810 let array = DecimalArray::from_iter([
1811 None,
1812 Some(Decimal::NaN),
1813 Some(Decimal::PositiveInf),
1814 Some(Decimal::NegativeInf),
1815 Some(Decimal::Normalized("123.4".parse().unwrap())),
1816 Some(Decimal::Normalized("123.456".parse().unwrap())),
1817 ]);
1818 let arrow = arrow_array::LargeBinaryArray::from(&array);
1819 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1820
1821 let arrow = arrow_array::StringArray::from(&array);
1822 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1823 }
1824
1825 #[test]
1826 fn jsonb() {
1827 let array = JsonbArray::from_iter([
1828 None,
1829 Some("null".parse().unwrap()),
1830 Some("false".parse().unwrap()),
1831 Some("1".parse().unwrap()),
1832 Some("[1, 2, 3]".parse().unwrap()),
1833 Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1834 ]);
1835 let arrow = arrow_array::LargeStringArray::from(&array);
1836 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1837
1838 let arrow = arrow_array::StringArray::from(&array);
1839 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1840 }
1841
1842 #[test]
1843 fn int256() {
1844 let values = [
1845 None,
1846 Some(Int256::from(1)),
1847 Some(Int256::from(i64::MAX)),
1848 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1849 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1850 Some(
1851 Int256::from(i64::MAX)
1852 * Int256::from(i64::MAX)
1853 * Int256::from(i64::MAX)
1854 * Int256::from(i64::MAX),
1855 ),
1856 Some(Int256::min_value()),
1857 Some(Int256::max_value()),
1858 ];
1859
1860 let array =
1861 Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1862 let arrow = arrow_array::Decimal256Array::from(&array);
1863 assert_eq!(Int256Array::from(&arrow), array);
1864 }
1865}