1#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60pub trait ToArrow {
65 fn to_record_batch(
69 &self,
70 schema: arrow_schema::SchemaRef,
71 chunk: &DataChunk,
72 ) -> Result<arrow_array::RecordBatch, ArrayError> {
73 if !chunk.is_compacted() {
75 let c = chunk.clone();
76 return self.to_record_batch(schema, &c.compact());
77 }
78
79 let columns: Vec<_> = chunk
81 .columns()
82 .iter()
83 .zip_eq_fast(schema.fields().iter())
84 .map(|(column, field)| self.to_array(field.data_type(), column))
85 .try_collect()?;
86
87 let opts =
89 arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90 arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91 .map_err(ArrayError::to_arrow)
92 }
93
94 fn to_array(
96 &self,
97 data_type: &arrow_schema::DataType,
98 array: &ArrayImpl,
99 ) -> Result<arrow_array::ArrayRef, ArrayError> {
100 let arrow_array = match array {
101 ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102 ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103 ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104 ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105 ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106 ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107 ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108 ArrayImpl::Date(array) => self.date_to_arrow(array),
109 ArrayImpl::Time(array) => self.time_to_arrow(array),
110 ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111 ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112 ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113 ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114 ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115 ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116 ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117 ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118 ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119 ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120 ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121 ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122 }?;
123 if arrow_array.data_type() != data_type {
124 arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125 } else {
126 Ok(arrow_array)
127 }
128 }
129
130 #[inline]
131 fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132 Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133 }
134
135 #[inline]
136 fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137 Ok(Arc::new(arrow_array::Int16Array::from(array)))
138 }
139
140 #[inline]
141 fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142 Ok(Arc::new(arrow_array::Int32Array::from(array)))
143 }
144
145 #[inline]
146 fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147 Ok(Arc::new(arrow_array::Int64Array::from(array)))
148 }
149
150 #[inline]
151 fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152 Ok(Arc::new(arrow_array::Float32Array::from(array)))
153 }
154
155 #[inline]
156 fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157 Ok(Arc::new(arrow_array::Float64Array::from(array)))
158 }
159
160 #[inline]
161 fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162 Ok(Arc::new(arrow_array::StringArray::from(array)))
163 }
164
165 #[inline]
166 fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167 Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168 }
169
170 #[inline]
171 fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172 Ok(Arc::new(arrow_array::Date32Array::from(array)))
173 }
174
175 #[inline]
176 fn timestamp_to_arrow(
177 &self,
178 array: &TimestampArray,
179 ) -> Result<arrow_array::ArrayRef, ArrayError> {
180 Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181 array,
182 )))
183 }
184
185 #[inline]
186 fn timestamptz_to_arrow(
187 &self,
188 array: &TimestamptzArray,
189 ) -> Result<arrow_array::ArrayRef, ArrayError> {
190 Ok(Arc::new(
191 arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192 ))
193 }
194
195 #[inline]
196 fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197 Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198 }
199
200 #[inline]
201 fn interval_to_arrow(
202 &self,
203 array: &IntervalArray,
204 ) -> Result<arrow_array::ArrayRef, ArrayError> {
205 Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206 array,
207 )))
208 }
209
210 #[inline]
211 fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212 Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213 }
214
215 #[inline]
217 fn decimal_to_arrow(
218 &self,
219 _data_type: &arrow_schema::DataType,
220 array: &DecimalArray,
221 ) -> Result<arrow_array::ArrayRef, ArrayError> {
222 Ok(Arc::new(arrow_array::StringArray::from(array)))
223 }
224
225 #[inline]
227 fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228 Ok(Arc::new(arrow_array::StringArray::from(array)))
229 }
230
231 #[inline]
232 fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233 Ok(Arc::new(arrow_array::Int64Array::from(array)))
234 }
235
236 #[inline]
237 fn list_to_arrow(
238 &self,
239 data_type: &arrow_schema::DataType,
240 array: &ListArray,
241 ) -> Result<arrow_array::ArrayRef, ArrayError> {
242 let arrow_schema::DataType::List(field) = data_type else {
243 return Err(ArrayError::to_arrow("Invalid list type"));
244 };
245 let values = self.to_array(field.data_type(), array.values())?;
246 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248 Ok(Arc::new(arrow_array::ListArray::new(
249 field.clone(),
250 offsets,
251 values,
252 nulls,
253 )))
254 }
255
256 #[inline]
257 fn vector_to_arrow(
258 &self,
259 data_type: &arrow_schema::DataType,
260 array: &VectorArray,
261 ) -> Result<arrow_array::ArrayRef, ArrayError> {
262 let arrow_schema::DataType::List(field) = data_type else {
263 return Err(ArrayError::to_arrow("Invalid list type"));
264 };
265 if field.data_type() != &arrow_schema::DataType::Float32 {
266 return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267 }
268 let values = Arc::new(arrow_array::Float32Array::from(
269 array.as_raw_slice().to_vec(),
270 ));
271 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273 Ok(Arc::new(arrow_array::ListArray::new(
274 field.clone(),
275 offsets,
276 values,
277 nulls,
278 )))
279 }
280
281 #[inline]
282 fn struct_to_arrow(
283 &self,
284 data_type: &arrow_schema::DataType,
285 array: &StructArray,
286 ) -> Result<arrow_array::ArrayRef, ArrayError> {
287 let arrow_schema::DataType::Struct(fields) = data_type else {
288 return Err(ArrayError::to_arrow("Invalid struct type"));
289 };
290 Ok(Arc::new(arrow_array::StructArray::new(
291 fields.clone(),
292 array
293 .fields()
294 .zip_eq_fast(fields)
295 .map(|(arr, field)| self.to_array(field.data_type(), arr))
296 .try_collect::<_, _, ArrayError>()?,
297 Some(array.null_bitmap().into()),
298 )))
299 }
300
301 #[inline]
302 fn map_to_arrow(
303 &self,
304 data_type: &arrow_schema::DataType,
305 array: &MapArray,
306 ) -> Result<arrow_array::ArrayRef, ArrayError> {
307 let arrow_schema::DataType::Map(field, ordered) = data_type else {
308 return Err(ArrayError::to_arrow("Invalid map type"));
309 };
310 if *ordered {
311 return Err(ArrayError::to_arrow("Sorted map is not supported"));
312 }
313 let values = self
314 .struct_to_arrow(field.data_type(), array.as_struct())?
315 .as_struct()
316 .clone();
317 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
318 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
319 Ok(Arc::new(arrow_array::MapArray::new(
320 field.clone(),
321 offsets,
322 values,
323 nulls,
324 *ordered,
325 )))
326 }
327
328 fn to_arrow_field(
333 &self,
334 name: &str,
335 value: &DataType,
336 ) -> Result<arrow_schema::Field, ArrayError> {
337 let data_type = match value {
338 DataType::Boolean => self.bool_type_to_arrow(),
340 DataType::Int16 => self.int16_type_to_arrow(),
341 DataType::Int32 => self.int32_type_to_arrow(),
342 DataType::Int64 => self.int64_type_to_arrow(),
343 DataType::Int256 => self.int256_type_to_arrow(),
344 DataType::Float32 => self.float32_type_to_arrow(),
345 DataType::Float64 => self.float64_type_to_arrow(),
346 DataType::Date => self.date_type_to_arrow(),
347 DataType::Time => self.time_type_to_arrow(),
348 DataType::Timestamp => self.timestamp_type_to_arrow(),
349 DataType::Timestamptz => self.timestamptz_type_to_arrow(),
350 DataType::Interval => self.interval_type_to_arrow(),
351 DataType::Varchar => self.varchar_type_to_arrow(),
352 DataType::Bytea => self.bytea_type_to_arrow(),
353 DataType::Serial => self.serial_type_to_arrow(),
354 DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
355 DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
356 DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
357 DataType::List(list) => self.list_type_to_arrow(list)?,
358 DataType::Map(map) => self.map_type_to_arrow(map)?,
359 DataType::Vector(_) => self.vector_type_to_arrow()?,
360 };
361 Ok(arrow_schema::Field::new(name, data_type, true))
362 }
363
364 #[inline]
365 fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
366 arrow_schema::DataType::Boolean
367 }
368
369 #[inline]
370 fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
371 arrow_schema::DataType::Int16
372 }
373
374 #[inline]
375 fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
376 arrow_schema::DataType::Int32
377 }
378
379 #[inline]
380 fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
381 arrow_schema::DataType::Int64
382 }
383
384 #[inline]
385 fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
386 arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
387 }
388
389 #[inline]
390 fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
391 arrow_schema::DataType::Float32
392 }
393
394 #[inline]
395 fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
396 arrow_schema::DataType::Float64
397 }
398
399 #[inline]
400 fn date_type_to_arrow(&self) -> arrow_schema::DataType {
401 arrow_schema::DataType::Date32
402 }
403
404 #[inline]
405 fn time_type_to_arrow(&self) -> arrow_schema::DataType {
406 arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
407 }
408
409 #[inline]
410 fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
411 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
412 }
413
414 #[inline]
415 fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
416 arrow_schema::DataType::Timestamp(
417 arrow_schema::TimeUnit::Microsecond,
418 Some("+00:00".into()),
419 )
420 }
421
422 #[inline]
423 fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
424 arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
425 }
426
427 #[inline]
428 fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
429 arrow_schema::DataType::Utf8
430 }
431
432 #[inline]
433 fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
434 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
435 .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
436 }
437
438 #[inline]
439 fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
440 arrow_schema::DataType::Binary
441 }
442
443 #[inline]
444 fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
445 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
446 .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
447 }
448
449 #[inline]
450 fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
451 arrow_schema::DataType::Int64
452 }
453
454 #[inline]
455 fn list_type_to_arrow(
456 &self,
457 list_type: &ListType,
458 ) -> Result<arrow_schema::DataType, ArrayError> {
459 Ok(arrow_schema::DataType::List(Arc::new(
460 self.to_arrow_field("item", list_type.elem())?,
461 )))
462 }
463
464 #[inline]
465 fn struct_type_to_arrow(
466 &self,
467 fields: &StructType,
468 ) -> Result<arrow_schema::DataType, ArrayError> {
469 Ok(arrow_schema::DataType::Struct(
470 fields
471 .iter()
472 .map(|(name, ty)| self.to_arrow_field(name, ty))
473 .try_collect::<_, _, ArrayError>()?,
474 ))
475 }
476
477 #[inline]
478 fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
479 let sorted = false;
480 let key = self
482 .to_arrow_field("key", map_type.key())?
483 .with_nullable(false);
484 let value = self.to_arrow_field("value", map_type.value())?;
485 Ok(arrow_schema::DataType::Map(
486 Arc::new(arrow_schema::Field::new(
487 "entries",
488 arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
489 false,
491 )),
492 sorted,
493 ))
494 }
495
496 #[inline]
497 fn vector_type_to_arrow(&self) -> Result<arrow_schema::DataType, ArrayError> {
498 Ok(arrow_schema::DataType::List(Arc::new(
499 self.to_arrow_field("item", &VECTOR_ITEM_TYPE)?,
500 )))
501 }
502}
503
504#[allow(clippy::wrong_self_convention)]
506pub trait FromArrow {
507 fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
509 let mut columns = Vec::with_capacity(batch.num_columns());
510 for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
511 let column = Arc::new(self.from_array(field, array)?);
512 columns.push(column);
513 }
514 Ok(DataChunk::new(columns, batch.num_rows()))
515 }
516
517 fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
519 Ok(StructType::new(
520 fields
521 .iter()
522 .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
523 .try_collect::<_, Vec<_>, ArrayError>()?,
524 ))
525 }
526
527 fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
529 use arrow_schema::DataType::*;
530 use arrow_schema::IntervalUnit::*;
531 use arrow_schema::TimeUnit::*;
532
533 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
535 return self.from_extension_type(type_name, field.data_type());
536 }
537
538 Ok(match field.data_type() {
539 Boolean => DataType::Boolean,
540 Int16 => DataType::Int16,
541 Int32 => DataType::Int32,
542 Int64 => DataType::Int64,
543 Int8 => DataType::Int16,
544 UInt8 => DataType::Int16,
545 UInt16 => DataType::Int32,
546 UInt32 => DataType::Int64,
547 UInt64 => DataType::Decimal,
548 Float16 => DataType::Float32,
549 Float32 => DataType::Float32,
550 Float64 => DataType::Float64,
551 Decimal128(_, _) => DataType::Decimal,
552 Decimal256(_, _) => DataType::Int256,
553 Date32 => DataType::Date,
554 Time64(Microsecond) => DataType::Time,
555 Timestamp(Microsecond, None) => DataType::Timestamp,
556 Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
557 Timestamp(Second, None) => DataType::Timestamp,
558 Timestamp(Second, Some(_)) => DataType::Timestamptz,
559 Timestamp(Millisecond, None) => DataType::Timestamp,
560 Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
561 Timestamp(Nanosecond, None) => DataType::Timestamp,
562 Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
563 Interval(MonthDayNano) => DataType::Interval,
564 Utf8 => DataType::Varchar,
565 Binary => DataType::Bytea,
566 LargeUtf8 => self.from_large_utf8()?,
567 LargeBinary => self.from_large_binary()?,
568 List(field) => DataType::list(self.from_field(field)?),
569 Struct(fields) => DataType::Struct(self.from_fields(fields)?),
570 Map(field, _is_sorted) => {
571 let entries = self.from_field(field)?;
572 DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
573 ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
574 })?)
575 }
576 t => {
577 return Err(ArrayError::from_arrow(format!(
578 "unsupported arrow data type: {t:?}"
579 )));
580 }
581 })
582 }
583
584 fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
586 Ok(DataType::Varchar)
587 }
588
589 fn from_large_binary(&self) -> Result<DataType, ArrayError> {
591 Ok(DataType::Bytea)
592 }
593
594 fn from_extension_type(
596 &self,
597 type_name: &str,
598 physical_type: &arrow_schema::DataType,
599 ) -> Result<DataType, ArrayError> {
600 match (type_name, physical_type) {
601 ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
602 ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
603 _ => Err(ArrayError::from_arrow(format!(
604 "unsupported extension type: {type_name:?}"
605 ))),
606 }
607 }
608
609 fn from_array(
611 &self,
612 field: &arrow_schema::Field,
613 array: &arrow_array::ArrayRef,
614 ) -> Result<ArrayImpl, ArrayError> {
615 use arrow_schema::DataType::*;
616 use arrow_schema::IntervalUnit::*;
617 use arrow_schema::TimeUnit::*;
618
619 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
621 return self.from_extension_array(type_name, array);
622 }
623 match array.data_type() {
624 Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
625 Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
626 Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
627 Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
628 Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
629 UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
630 UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
631 UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
632
633 UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
634 Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
635 Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
636 Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
637 Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
638 Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
639 Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
640 Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
641 Timestamp(Second, None) => {
642 self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
643 }
644 Timestamp(Second, Some(_)) => {
645 self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
646 }
647 Timestamp(Millisecond, None) => {
648 self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
649 }
650 Timestamp(Millisecond, Some(_)) => {
651 self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
652 }
653 Timestamp(Microsecond, None) => {
654 self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
655 }
656 Timestamp(Microsecond, Some(_)) => {
657 self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
658 }
659 Timestamp(Nanosecond, None) => {
660 self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
661 }
662 Timestamp(Nanosecond, Some(_)) => {
663 self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
664 }
665 Interval(MonthDayNano) => {
666 self.from_interval_array(array.as_any().downcast_ref().unwrap())
667 }
668 Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
669 Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
670 LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
671 LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
672 List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
673 Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
674 Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
675 t => Err(ArrayError::from_arrow(format!(
676 "unsupported arrow data type: {t:?}",
677 ))),
678 }
679 }
680
681 fn from_extension_array(
683 &self,
684 type_name: &str,
685 array: &arrow_array::ArrayRef,
686 ) -> Result<ArrayImpl, ArrayError> {
687 match type_name {
688 "arrowudf.decimal" => {
689 let array: &arrow_array::StringArray =
690 array.as_any().downcast_ref().ok_or_else(|| {
691 ArrayError::from_arrow(
692 "expected string array for `arrowudf.decimal`".to_owned(),
693 )
694 })?;
695 Ok(ArrayImpl::Decimal(array.try_into()?))
696 }
697 "arrowudf.json" => {
698 let array: &arrow_array::StringArray =
699 array.as_any().downcast_ref().ok_or_else(|| {
700 ArrayError::from_arrow(
701 "expected string array for `arrowudf.json`".to_owned(),
702 )
703 })?;
704 Ok(ArrayImpl::Jsonb(array.try_into()?))
705 }
706 _ => Err(ArrayError::from_arrow(format!(
707 "unsupported extension type: {type_name:?}"
708 ))),
709 }
710 }
711
712 fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
713 Ok(ArrayImpl::Bool(array.into()))
714 }
715
716 fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
717 Ok(ArrayImpl::Int16(array.into()))
718 }
719
720 fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
721 Ok(ArrayImpl::Int16(array.into()))
722 }
723
724 fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
725 Ok(ArrayImpl::Int16(array.into()))
726 }
727
728 fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
729 Ok(ArrayImpl::Int32(array.into()))
730 }
731
732 fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
733 Ok(ArrayImpl::Int64(array.into()))
734 }
735
736 fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
737 Ok(ArrayImpl::Int32(array.into()))
738 }
739
740 fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
741 Ok(ArrayImpl::Int64(array.into()))
742 }
743
744 fn from_int256_array(
745 &self,
746 array: &arrow_array::Decimal256Array,
747 ) -> Result<ArrayImpl, ArrayError> {
748 Ok(ArrayImpl::Int256(array.into()))
749 }
750
751 fn from_decimal128_array(
752 &self,
753 array: &arrow_array::Decimal128Array,
754 ) -> Result<ArrayImpl, ArrayError> {
755 Ok(ArrayImpl::Decimal(array.try_into()?))
756 }
757
758 fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
759 Ok(ArrayImpl::Decimal(array.try_into()?))
760 }
761
762 fn from_float16_array(
763 &self,
764 array: &arrow_array::Float16Array,
765 ) -> Result<ArrayImpl, ArrayError> {
766 Ok(ArrayImpl::Float32(array.try_into()?))
767 }
768
769 fn from_float32_array(
770 &self,
771 array: &arrow_array::Float32Array,
772 ) -> Result<ArrayImpl, ArrayError> {
773 Ok(ArrayImpl::Float32(array.into()))
774 }
775
776 fn from_float64_array(
777 &self,
778 array: &arrow_array::Float64Array,
779 ) -> Result<ArrayImpl, ArrayError> {
780 Ok(ArrayImpl::Float64(array.into()))
781 }
782
783 fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
784 Ok(ArrayImpl::Date(array.into()))
785 }
786
787 fn from_time64us_array(
788 &self,
789 array: &arrow_array::Time64MicrosecondArray,
790 ) -> Result<ArrayImpl, ArrayError> {
791 Ok(ArrayImpl::Time(array.into()))
792 }
793
794 fn from_timestampsecond_array(
795 &self,
796 array: &arrow_array::TimestampSecondArray,
797 ) -> Result<ArrayImpl, ArrayError> {
798 Ok(ArrayImpl::Timestamp(array.into()))
799 }
800 fn from_timestampsecond_some_array(
801 &self,
802 array: &arrow_array::TimestampSecondArray,
803 ) -> Result<ArrayImpl, ArrayError> {
804 Ok(ArrayImpl::Timestamptz(array.into()))
805 }
806
807 fn from_timestampms_array(
808 &self,
809 array: &arrow_array::TimestampMillisecondArray,
810 ) -> Result<ArrayImpl, ArrayError> {
811 Ok(ArrayImpl::Timestamp(array.into()))
812 }
813
814 fn from_timestampms_some_array(
815 &self,
816 array: &arrow_array::TimestampMillisecondArray,
817 ) -> Result<ArrayImpl, ArrayError> {
818 Ok(ArrayImpl::Timestamptz(array.into()))
819 }
820
821 fn from_timestampus_array(
822 &self,
823 array: &arrow_array::TimestampMicrosecondArray,
824 ) -> Result<ArrayImpl, ArrayError> {
825 Ok(ArrayImpl::Timestamp(array.into()))
826 }
827
828 fn from_timestampus_some_array(
829 &self,
830 array: &arrow_array::TimestampMicrosecondArray,
831 ) -> Result<ArrayImpl, ArrayError> {
832 Ok(ArrayImpl::Timestamptz(array.into()))
833 }
834
835 fn from_timestampns_array(
836 &self,
837 array: &arrow_array::TimestampNanosecondArray,
838 ) -> Result<ArrayImpl, ArrayError> {
839 Ok(ArrayImpl::Timestamp(array.into()))
840 }
841
842 fn from_timestampns_some_array(
843 &self,
844 array: &arrow_array::TimestampNanosecondArray,
845 ) -> Result<ArrayImpl, ArrayError> {
846 Ok(ArrayImpl::Timestamptz(array.into()))
847 }
848
849 fn from_interval_array(
850 &self,
851 array: &arrow_array::IntervalMonthDayNanoArray,
852 ) -> Result<ArrayImpl, ArrayError> {
853 Ok(ArrayImpl::Interval(array.into()))
854 }
855
856 fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
857 Ok(ArrayImpl::Utf8(array.into()))
858 }
859
860 fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
861 Ok(ArrayImpl::Bytea(array.into()))
862 }
863
864 fn from_large_utf8_array(
865 &self,
866 array: &arrow_array::LargeStringArray,
867 ) -> Result<ArrayImpl, ArrayError> {
868 Ok(ArrayImpl::Utf8(array.into()))
869 }
870
871 fn from_large_binary_array(
872 &self,
873 array: &arrow_array::LargeBinaryArray,
874 ) -> Result<ArrayImpl, ArrayError> {
875 Ok(ArrayImpl::Bytea(array.into()))
876 }
877
878 fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
879 use arrow_array::Array;
880 let arrow_schema::DataType::List(field) = array.data_type() else {
881 panic!("nested field types cannot be determined.");
882 };
883 Ok(ArrayImpl::List(ListArray {
884 value: Box::new(self.from_array(field, array.values())?),
885 bitmap: match array.nulls() {
886 Some(nulls) => nulls.iter().collect(),
887 None => Bitmap::ones(array.len()),
888 },
889 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
890 }))
891 }
892
893 fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
894 use arrow_array::Array;
895 let arrow_schema::DataType::Struct(fields) = array.data_type() else {
896 panic!("nested field types cannot be determined.");
897 };
898 Ok(ArrayImpl::Struct(StructArray::new(
899 self.from_fields(fields)?,
900 array
901 .columns()
902 .iter()
903 .zip_eq_fast(fields)
904 .map(|(array, field)| self.from_array(field, array).map(Arc::new))
905 .try_collect()?,
906 (0..array.len()).map(|i| array.is_valid(i)).collect(),
907 )))
908 }
909
910 fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
911 use arrow_array::Array;
912 let struct_array = self.from_struct_array(array.entries())?;
913 let list_array = ListArray {
914 value: Box::new(struct_array),
915 bitmap: match array.nulls() {
916 Some(nulls) => nulls.iter().collect(),
917 None => Bitmap::ones(array.len()),
918 },
919 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
920 };
921
922 Ok(ArrayImpl::Map(MapArray { inner: list_array }))
923 }
924}
925
926impl From<&Bitmap> for arrow_buffer::NullBuffer {
927 fn from(bitmap: &Bitmap) -> Self {
928 bitmap.iter().collect()
929 }
930}
931
932macro_rules! converts {
934 ($ArrayType:ty, $ArrowType:ty) => {
935 impl From<&$ArrayType> for $ArrowType {
936 fn from(array: &$ArrayType) -> Self {
937 array.iter().collect()
938 }
939 }
940 impl From<&$ArrowType> for $ArrayType {
941 fn from(array: &$ArrowType) -> Self {
942 array.iter().collect()
943 }
944 }
945 impl From<&[$ArrowType]> for $ArrayType {
946 fn from(arrays: &[$ArrowType]) -> Self {
947 arrays.iter().flat_map(|a| a.iter()).collect()
948 }
949 }
950 };
951 ($ArrayType:ty, $ArrowType:ty, @map) => {
953 impl From<&$ArrayType> for $ArrowType {
954 fn from(array: &$ArrayType) -> Self {
955 array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
956 }
957 }
958 impl From<&$ArrowType> for $ArrayType {
959 fn from(array: &$ArrowType) -> Self {
960 array
961 .iter()
962 .map(|o| {
963 o.map(|v| {
964 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
965 })
966 })
967 .collect()
968 }
969 }
970 impl From<&[$ArrowType]> for $ArrayType {
971 fn from(arrays: &[$ArrowType]) -> Self {
972 arrays
973 .iter()
974 .flat_map(|a| a.iter())
975 .map(|o| {
976 o.map(|v| {
977 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
978 })
979 })
980 .collect()
981 }
982 }
983 };
984}
985
986macro_rules! converts_with_type {
988 ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
989 impl From<&$ArrayType> for $ArrowType {
990 fn from(array: &$ArrayType) -> Self {
991 let values: Vec<Option<$ToType>> =
992 array.iter().map(|x| x.map(|v| v as $ToType)).collect();
993 <$ArrowType>::from_iter(values)
994 }
995 }
996
997 impl From<&$ArrowType> for $ArrayType {
998 fn from(array: &$ArrowType) -> Self {
999 let values: Vec<Option<$FromType>> =
1000 array.iter().map(|x| x.map(|v| v as $FromType)).collect();
1001 <$ArrayType>::from_iter(values)
1002 }
1003 }
1004
1005 impl From<&[$ArrowType]> for $ArrayType {
1006 fn from(arrays: &[$ArrowType]) -> Self {
1007 let values: Vec<Option<$FromType>> = arrays
1008 .iter()
1009 .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1010 .collect();
1011 <$ArrayType>::from_iter(values)
1012 }
1013 }
1014 };
1015}
1016
1017macro_rules! converts_with_timeunit {
1018 ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1019
1020 impl From<&$ArrayType> for $ArrowType {
1021 fn from(array: &$ArrayType) -> Self {
1022 array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1023 }
1024 }
1025
1026 impl From<&$ArrowType> for $ArrayType {
1027 fn from(array: &$ArrowType) -> Self {
1028 array.iter().map(|o| {
1029 o.map(|v| {
1030 let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1031 timestamp
1032 })
1033 }).collect()
1034 }
1035 }
1036
1037 impl From<&[$ArrowType]> for $ArrayType {
1038 fn from(arrays: &[$ArrowType]) -> Self {
1039 arrays
1040 .iter()
1041 .flat_map(|a| a.iter())
1042 .map(|o| {
1043 o.map(|v| {
1044 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1045 })
1046 })
1047 .collect()
1048 }
1049 }
1050
1051 };
1052}
1053
1054converts!(BoolArray, arrow_array::BooleanArray);
1055converts!(I16Array, arrow_array::Int16Array);
1056converts!(I32Array, arrow_array::Int32Array);
1057converts!(I64Array, arrow_array::Int64Array);
1058converts!(F32Array, arrow_array::Float32Array, @map);
1059converts!(F64Array, arrow_array::Float64Array, @map);
1060converts!(BytesArray, arrow_array::BinaryArray);
1061converts!(BytesArray, arrow_array::LargeBinaryArray);
1062converts!(Utf8Array, arrow_array::StringArray);
1063converts!(Utf8Array, arrow_array::LargeStringArray);
1064converts!(DateArray, arrow_array::Date32Array, @map);
1065converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1066converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1067converts!(SerialArray, arrow_array::Int64Array, @map);
1068
1069converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1070converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1071converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1072converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1073
1074converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1075converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1076converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1077converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1078
1079converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1080converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1081converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1082converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1083
1084trait FromIntoArrow {
1086 type ArrowType;
1088 fn from_arrow(value: Self::ArrowType) -> Self;
1089 fn into_arrow(self) -> Self::ArrowType;
1090}
1091
1092trait FromIntoArrowWithUnit {
1095 type ArrowType;
1096 type TimestampType;
1098 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1099 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1100}
1101
1102impl FromIntoArrow for Serial {
1103 type ArrowType = i64;
1104
1105 fn from_arrow(value: Self::ArrowType) -> Self {
1106 value.into()
1107 }
1108
1109 fn into_arrow(self) -> Self::ArrowType {
1110 self.into()
1111 }
1112}
1113
1114impl FromIntoArrow for F32 {
1115 type ArrowType = f32;
1116
1117 fn from_arrow(value: Self::ArrowType) -> Self {
1118 value.into()
1119 }
1120
1121 fn into_arrow(self) -> Self::ArrowType {
1122 self.into()
1123 }
1124}
1125
1126impl FromIntoArrow for F64 {
1127 type ArrowType = f64;
1128
1129 fn from_arrow(value: Self::ArrowType) -> Self {
1130 value.into()
1131 }
1132
1133 fn into_arrow(self) -> Self::ArrowType {
1134 self.into()
1135 }
1136}
1137
1138impl FromIntoArrow for Date {
1139 type ArrowType = i32;
1140
1141 fn from_arrow(value: Self::ArrowType) -> Self {
1142 Date(arrow_array::types::Date32Type::to_naive_date(value))
1143 }
1144
1145 fn into_arrow(self) -> Self::ArrowType {
1146 arrow_array::types::Date32Type::from_naive_date(self.0)
1147 }
1148}
1149
1150impl FromIntoArrow for Time {
1151 type ArrowType = i64;
1152
1153 fn from_arrow(value: Self::ArrowType) -> Self {
1154 Time(
1155 NaiveTime::from_num_seconds_from_midnight_opt(
1156 (value / 1_000_000) as _,
1157 (value % 1_000_000 * 1000) as _,
1158 )
1159 .unwrap(),
1160 )
1161 }
1162
1163 fn into_arrow(self) -> Self::ArrowType {
1164 self.0
1165 .signed_duration_since(NaiveTime::default())
1166 .num_microseconds()
1167 .unwrap()
1168 }
1169}
1170
1171impl FromIntoArrowWithUnit for Timestamp {
1172 type ArrowType = i64;
1173 type TimestampType = TimeUnit;
1174
1175 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1176 match time_unit {
1177 TimeUnit::Second => {
1178 Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1179 }
1180 TimeUnit::Millisecond => {
1181 Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1182 }
1183 TimeUnit::Microsecond => {
1184 Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1185 }
1186 TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1187 }
1188 }
1189
1190 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1191 match time_unit {
1192 TimeUnit::Second => self.0.and_utc().timestamp(),
1193 TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1194 TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1195 TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1196 }
1197 }
1198}
1199
1200impl FromIntoArrowWithUnit for Timestamptz {
1201 type ArrowType = i64;
1202 type TimestampType = TimeUnit;
1203
1204 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1205 match time_unit {
1206 TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1207 TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1208 TimeUnit::Microsecond => Timestamptz::from_micros(value),
1209 TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1210 }
1211 }
1212
1213 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1214 match time_unit {
1215 TimeUnit::Second => self.timestamp(),
1216 TimeUnit::Millisecond => self.timestamp_millis(),
1217 TimeUnit::Microsecond => self.timestamp_micros(),
1218 TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1219 }
1220 }
1221}
1222
1223impl FromIntoArrow for Interval {
1224 type ArrowType = ArrowIntervalType;
1225
1226 fn from_arrow(value: Self::ArrowType) -> Self {
1227 Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1228 }
1229
1230 fn into_arrow(self) -> Self::ArrowType {
1231 ArrowIntervalType {
1232 months: self.months(),
1233 days: self.days(),
1234 nanoseconds: self.usecs() * 1000,
1236 }
1237 }
1238}
1239
1240impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1241 fn from(array: &DecimalArray) -> Self {
1242 let mut builder =
1243 arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1244 for value in array.iter() {
1245 builder.append_option(value.map(|d| d.to_string()));
1246 }
1247 builder.finish()
1248 }
1249}
1250
1251impl From<&DecimalArray> for arrow_array::StringArray {
1252 fn from(array: &DecimalArray) -> Self {
1253 let mut builder =
1254 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1255 for value in array.iter() {
1256 builder.append_option(value.map(|d| d.to_string()));
1257 }
1258 builder.finish()
1259 }
1260}
1261
1262impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1264 type Error = ArrayError;
1265
1266 fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1267 if array.scale() < 0 {
1268 bail!("support negative scale for arrow decimal")
1269 }
1270 let from_arrow = |value| {
1271 const NAN: i128 = i128::MIN + 1;
1272 let res = match value {
1273 NAN => Decimal::NaN,
1274 i128::MAX => Decimal::PositiveInf,
1275 i128::MIN => Decimal::NegativeInf,
1276 _ => Decimal::Normalized(
1277 rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1278 .map_err(ArrayError::internal)?,
1279 ),
1280 };
1281 Ok(res)
1282 };
1283 array
1284 .iter()
1285 .map(|o| o.map(from_arrow).transpose())
1286 .collect::<Result<Self, Self::Error>>()
1287 }
1288}
1289
1290impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1292 type Error = ArrayError;
1293
1294 fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1295 let from_arrow = |value| {
1296 let res = Decimal::from(value);
1298 Ok(res)
1299 };
1300
1301 array
1303 .iter()
1304 .map(|o| o.map(from_arrow).transpose())
1305 .collect::<Result<Self, Self::Error>>()
1306 }
1307}
1308
1309impl TryFrom<&arrow_array::Float16Array> for F32Array {
1310 type Error = ArrayError;
1311
1312 fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1313 let from_arrow = |value| Ok(f32::from(value));
1314
1315 array
1316 .iter()
1317 .map(|o| o.map(from_arrow).transpose())
1318 .collect::<Result<Self, Self::Error>>()
1319 }
1320}
1321
1322impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1323 type Error = ArrayError;
1324
1325 fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1326 array
1327 .iter()
1328 .map(|o| {
1329 o.map(|s| {
1330 let s = std::str::from_utf8(s)
1331 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1332 s.parse()
1333 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1334 })
1335 .transpose()
1336 })
1337 .try_collect()
1338 }
1339}
1340
1341impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1342 type Error = ArrayError;
1343
1344 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1345 array
1346 .iter()
1347 .map(|o| {
1348 o.map(|s| {
1349 s.parse()
1350 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1351 })
1352 .transpose()
1353 })
1354 .try_collect()
1355 }
1356}
1357
1358impl From<&JsonbArray> for arrow_array::StringArray {
1359 fn from(array: &JsonbArray) -> Self {
1360 let mut builder =
1361 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1362 for value in array.iter() {
1363 match value {
1364 Some(jsonb) => {
1365 write!(&mut builder, "{}", jsonb).unwrap();
1366 builder.append_value("");
1367 }
1368 None => builder.append_null(),
1369 }
1370 }
1371 builder.finish()
1372 }
1373}
1374
1375impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1376 type Error = ArrayError;
1377
1378 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1379 array
1380 .iter()
1381 .map(|o| {
1382 o.map(|s| {
1383 s.parse()
1384 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1385 })
1386 .transpose()
1387 })
1388 .try_collect()
1389 }
1390}
1391
1392impl From<&IntervalArray> for arrow_array::StringArray {
1393 fn from(array: &IntervalArray) -> Self {
1394 let mut builder =
1395 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1396 for value in array.iter() {
1397 match value {
1398 Some(interval) => {
1399 write!(&mut builder, "{}", interval).unwrap();
1400 builder.append_value("");
1401 }
1402 None => builder.append_null(),
1403 }
1404 }
1405 builder.finish()
1406 }
1407}
1408
1409impl From<&JsonbArray> for arrow_array::LargeStringArray {
1410 fn from(array: &JsonbArray) -> Self {
1411 let mut builder =
1412 arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1413 for value in array.iter() {
1414 match value {
1415 Some(jsonb) => {
1416 write!(&mut builder, "{}", jsonb).unwrap();
1417 builder.append_value("");
1418 }
1419 None => builder.append_null(),
1420 }
1421 }
1422 builder.finish()
1423 }
1424}
1425
1426impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1427 type Error = ArrayError;
1428
1429 fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1430 array
1431 .iter()
1432 .map(|o| {
1433 o.map(|s| {
1434 s.parse()
1435 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1436 })
1437 .transpose()
1438 })
1439 .try_collect()
1440 }
1441}
1442
1443impl From<arrow_buffer::i256> for Int256 {
1444 fn from(value: arrow_buffer::i256) -> Self {
1445 let buffer = value.to_be_bytes();
1446 Int256::from_be_bytes(buffer)
1447 }
1448}
1449
1450impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1451 fn from(val: Int256Ref<'a>) -> Self {
1452 let buffer = val.to_be_bytes();
1453 arrow_buffer::i256::from_be_bytes(buffer)
1454 }
1455}
1456
1457impl From<&Int256Array> for arrow_array::Decimal256Array {
1458 fn from(array: &Int256Array) -> Self {
1459 array
1460 .iter()
1461 .map(|o| o.map(arrow_buffer::i256::from))
1462 .collect()
1463 }
1464}
1465
1466impl From<&arrow_array::Decimal256Array> for Int256Array {
1467 fn from(array: &arrow_array::Decimal256Array) -> Self {
1468 let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1469
1470 values
1471 .iter()
1472 .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1473 .collect()
1474 }
1475}
1476
1477pub fn is_parquet_schema_match_source_schema(
1493 arrow_data_type: &arrow_schema::DataType,
1494 rw_data_type: &crate::types::DataType,
1495) -> bool {
1496 use arrow_schema::DataType as ArrowType;
1497
1498 use crate::types::{DataType as RwType, MapType, StructType};
1499
1500 match (arrow_data_type, rw_data_type) {
1501 (ArrowType::Boolean, RwType::Boolean)
1503 | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1504 | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1505 | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1506 | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1507 | (ArrowType::Decimal256(_, _), RwType::Int256)
1508 | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1509 | (ArrowType::Float64, RwType::Float64)
1510 | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1511 | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1512 | (ArrowType::Date32, RwType::Date)
1513 | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1514 | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1515 | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1516 | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1517
1518 (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1521 if arrow_fields.len() != rw_struct.len() {
1522 return false;
1523 }
1524 for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1525 {
1526 if arrow_field.name() != rw_name {
1527 return false;
1528 }
1529 if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1530 return false;
1531 }
1532 }
1533 true
1534 }
1535 (ArrowType::List(arrow_field), RwType::List(rw_list_ty)) => {
1538 is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_list_ty.elem())
1539 }
1540 (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1544 if let ArrowType::Struct(fields) = arrow_field.data_type() {
1545 if fields.len() != 2 {
1546 return false;
1547 }
1548 let key_field = &fields[0];
1549 let value_field = &fields[1];
1550 if key_field.name() != "key" || value_field.name() != "value" {
1551 return false;
1552 }
1553 let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1554 is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1555 && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1556 } else {
1557 false
1558 }
1559 }
1560 _ => false,
1562 }
1563}
1564#[cfg(test)]
1565mod tests {
1566
1567 use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1568
1569 use super::*;
1570 use crate::types::{DataType as RwType, MapType, StructType};
1571
1572 #[test]
1573 fn test_struct_schema_match() {
1574 let arrow_struct = ArrowType::Struct(
1577 vec![
1578 ArrowField::new("f1", ArrowType::Float64, true),
1579 ArrowField::new("f2", ArrowType::Utf8, true),
1580 ]
1581 .into(),
1582 );
1583 let rw_struct = RwType::Struct(StructType::new(vec![
1585 ("f1".to_owned(), RwType::Float64),
1586 ("f2".to_owned(), RwType::Varchar),
1587 ]));
1588 assert!(is_parquet_schema_match_source_schema(
1589 &arrow_struct,
1590 &rw_struct
1591 ));
1592
1593 let arrow_struct2 = ArrowType::Struct(
1595 vec![
1596 ArrowField::new("f1", ArrowType::Float64, true),
1597 ArrowField::new("f3", ArrowType::Utf8, true),
1598 ]
1599 .into(),
1600 );
1601 assert!(!is_parquet_schema_match_source_schema(
1602 &arrow_struct2,
1603 &rw_struct
1604 ));
1605 }
1606
1607 #[test]
1608 fn test_list_schema_match() {
1609 let arrow_list =
1611 ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1612 let rw_list = RwType::Float64.list();
1614 assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1615
1616 let rw_list2 = RwType::Int32.list();
1617 assert!(!is_parquet_schema_match_source_schema(
1618 &arrow_list,
1619 &rw_list2
1620 ));
1621 }
1622
1623 #[test]
1624 fn test_map_schema_match() {
1625 let arrow_map = ArrowType::Map(
1627 Arc::new(ArrowField::new(
1628 "entries",
1629 ArrowType::Struct(
1630 vec![
1631 ArrowField::new("key", ArrowType::Utf8, false),
1632 ArrowField::new("value", ArrowType::Int32, true),
1633 ]
1634 .into(),
1635 ),
1636 false,
1637 )),
1638 false,
1639 );
1640 let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1642 assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1643
1644 let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1646 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1647
1648 let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1650 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1651
1652 let arrow_map2 = ArrowType::Map(
1654 Arc::new(ArrowField::new(
1655 "entries",
1656 ArrowType::Struct(
1657 vec![
1658 ArrowField::new("k", ArrowType::Utf8, false),
1659 ArrowField::new("value", ArrowType::Int32, true),
1660 ]
1661 .into(),
1662 ),
1663 false,
1664 )),
1665 false,
1666 );
1667 assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1668 }
1669
1670 #[test]
1671 fn bool() {
1672 let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1673 let arrow = arrow_array::BooleanArray::from(&array);
1674 assert_eq!(BoolArray::from(&arrow), array);
1675 }
1676
1677 #[test]
1678 fn i16() {
1679 let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1680 let arrow = arrow_array::Int16Array::from(&array);
1681 assert_eq!(I16Array::from(&arrow), array);
1682 }
1683
1684 #[test]
1685 fn i32() {
1686 let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1687 let arrow = arrow_array::Int32Array::from(&array);
1688 assert_eq!(I32Array::from(&arrow), array);
1689 }
1690
1691 #[test]
1692 fn i64() {
1693 let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1694 let arrow = arrow_array::Int64Array::from(&array);
1695 assert_eq!(I64Array::from(&arrow), array);
1696 }
1697
1698 #[test]
1699 fn f32() {
1700 let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1701 let arrow = arrow_array::Float32Array::from(&array);
1702 assert_eq!(F32Array::from(&arrow), array);
1703 }
1704
1705 #[test]
1706 fn f64() {
1707 let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1708 let arrow = arrow_array::Float64Array::from(&array);
1709 assert_eq!(F64Array::from(&arrow), array);
1710 }
1711
1712 #[test]
1713 fn int8() {
1714 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1715 let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1716 let converted: PrimitiveArray<i16> = (&arr).into();
1717 assert_eq!(converted, array);
1718 }
1719
1720 #[test]
1721 fn uint8() {
1722 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1723 let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1724 let converted: PrimitiveArray<i16> = (&arr).into();
1725 assert_eq!(converted, array);
1726 }
1727
1728 #[test]
1729 fn uint16() {
1730 let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1731 let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1732 let converted: PrimitiveArray<i32> = (&arr).into();
1733 assert_eq!(converted, array);
1734 }
1735
1736 #[test]
1737 fn uint32() {
1738 let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1739 let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1740 let converted: PrimitiveArray<i64> = (&arr).into();
1741 assert_eq!(converted, array);
1742 }
1743
1744 #[test]
1745 fn uint64() {
1746 let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1747 None,
1748 Some(Decimal::Normalized("7".parse().unwrap())),
1749 Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1750 ]);
1751 let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1752 let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1753 assert_eq!(converted, array);
1754 }
1755
1756 #[test]
1757 fn date() {
1758 let array = DateArray::from_iter([
1759 None,
1760 Date::with_days_since_ce(12345).ok(),
1761 Date::with_days_since_ce(-12345).ok(),
1762 ]);
1763 let arrow = arrow_array::Date32Array::from(&array);
1764 assert_eq!(DateArray::from(&arrow), array);
1765 }
1766
1767 #[test]
1768 fn time() {
1769 let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1770 let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1771 assert_eq!(TimeArray::from(&arrow), array);
1772 }
1773
1774 #[test]
1775 fn timestamp() {
1776 let array =
1777 TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1778 let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1779 assert_eq!(TimestampArray::from(&arrow), array);
1780 }
1781
1782 #[test]
1783 fn interval() {
1784 let array = IntervalArray::from_iter([
1785 None,
1786 Some(Interval::from_month_day_usec(
1787 1_000_000,
1788 1_000,
1789 1_000_000_000,
1790 )),
1791 Some(Interval::from_month_day_usec(
1792 -1_000_000,
1793 -1_000,
1794 -1_000_000_000,
1795 )),
1796 ]);
1797 let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1798 assert_eq!(IntervalArray::from(&arrow), array);
1799 }
1800
1801 #[test]
1802 fn string() {
1803 let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1804 let arrow = arrow_array::StringArray::from(&array);
1805 assert_eq!(Utf8Array::from(&arrow), array);
1806 }
1807
1808 #[test]
1809 fn binary() {
1810 let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1811 let arrow = arrow_array::BinaryArray::from(&array);
1812 assert_eq!(BytesArray::from(&arrow), array);
1813 }
1814
1815 #[test]
1816 fn decimal() {
1817 let array = DecimalArray::from_iter([
1818 None,
1819 Some(Decimal::NaN),
1820 Some(Decimal::PositiveInf),
1821 Some(Decimal::NegativeInf),
1822 Some(Decimal::Normalized("123.4".parse().unwrap())),
1823 Some(Decimal::Normalized("123.456".parse().unwrap())),
1824 ]);
1825 let arrow = arrow_array::LargeBinaryArray::from(&array);
1826 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1827
1828 let arrow = arrow_array::StringArray::from(&array);
1829 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1830 }
1831
1832 #[test]
1833 fn jsonb() {
1834 let array = JsonbArray::from_iter([
1835 None,
1836 Some("null".parse().unwrap()),
1837 Some("false".parse().unwrap()),
1838 Some("1".parse().unwrap()),
1839 Some("[1, 2, 3]".parse().unwrap()),
1840 Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1841 ]);
1842 let arrow = arrow_array::LargeStringArray::from(&array);
1843 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1844
1845 let arrow = arrow_array::StringArray::from(&array);
1846 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1847 }
1848
1849 #[test]
1850 fn int256() {
1851 let values = [
1852 None,
1853 Some(Int256::from(1)),
1854 Some(Int256::from(i64::MAX)),
1855 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1856 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1857 Some(
1858 Int256::from(i64::MAX)
1859 * Int256::from(i64::MAX)
1860 * Int256::from(i64::MAX)
1861 * Int256::from(i64::MAX),
1862 ),
1863 Some(Int256::min_value()),
1864 Some(Int256::max_value()),
1865 ];
1866
1867 let array =
1868 Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1869 let arrow = arrow_array::Decimal256Array::from(&array);
1870 assert_eq!(Int256Array::from(&arrow), array);
1871 }
1872}