1#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60pub trait ToArrow {
65 fn to_record_batch(
69 &self,
70 schema: arrow_schema::SchemaRef,
71 chunk: &DataChunk,
72 ) -> Result<arrow_array::RecordBatch, ArrayError> {
73 if !chunk.is_vis_compacted() {
75 let c = chunk.clone();
76 return self.to_record_batch(schema, &c.compact_vis());
77 }
78
79 let columns: Vec<_> = chunk
81 .columns()
82 .iter()
83 .zip_eq_fast(schema.fields().iter())
84 .map(|(column, field)| self.to_array(field.data_type(), column))
85 .try_collect()?;
86
87 let opts =
89 arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90 arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91 .map_err(ArrayError::to_arrow)
92 }
93
94 fn to_array(
96 &self,
97 data_type: &arrow_schema::DataType,
98 array: &ArrayImpl,
99 ) -> Result<arrow_array::ArrayRef, ArrayError> {
100 let arrow_array = match array {
101 ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102 ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103 ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104 ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105 ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106 ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107 ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108 ArrayImpl::Date(array) => self.date_to_arrow(array),
109 ArrayImpl::Time(array) => self.time_to_arrow(array),
110 ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111 ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112 ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113 ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114 ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115 ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116 ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117 ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118 ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119 ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120 ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121 ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122 }?;
123 if arrow_array.data_type() != data_type {
124 arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125 } else {
126 Ok(arrow_array)
127 }
128 }
129
130 #[inline]
131 fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132 Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133 }
134
135 #[inline]
136 fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137 Ok(Arc::new(arrow_array::Int16Array::from(array)))
138 }
139
140 #[inline]
141 fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142 Ok(Arc::new(arrow_array::Int32Array::from(array)))
143 }
144
145 #[inline]
146 fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147 Ok(Arc::new(arrow_array::Int64Array::from(array)))
148 }
149
150 #[inline]
151 fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152 Ok(Arc::new(arrow_array::Float32Array::from(array)))
153 }
154
155 #[inline]
156 fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157 Ok(Arc::new(arrow_array::Float64Array::from(array)))
158 }
159
160 #[inline]
161 fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162 Ok(Arc::new(arrow_array::StringArray::from(array)))
163 }
164
165 #[inline]
166 fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167 Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168 }
169
170 #[inline]
171 fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172 Ok(Arc::new(arrow_array::Date32Array::from(array)))
173 }
174
175 #[inline]
176 fn timestamp_to_arrow(
177 &self,
178 array: &TimestampArray,
179 ) -> Result<arrow_array::ArrayRef, ArrayError> {
180 Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181 array,
182 )))
183 }
184
185 #[inline]
186 fn timestamptz_to_arrow(
187 &self,
188 array: &TimestamptzArray,
189 ) -> Result<arrow_array::ArrayRef, ArrayError> {
190 Ok(Arc::new(
191 arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192 ))
193 }
194
195 #[inline]
196 fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197 Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198 }
199
200 #[inline]
201 fn interval_to_arrow(
202 &self,
203 array: &IntervalArray,
204 ) -> Result<arrow_array::ArrayRef, ArrayError> {
205 Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206 array,
207 )))
208 }
209
210 #[inline]
211 fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212 Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213 }
214
215 #[inline]
217 fn decimal_to_arrow(
218 &self,
219 _data_type: &arrow_schema::DataType,
220 array: &DecimalArray,
221 ) -> Result<arrow_array::ArrayRef, ArrayError> {
222 Ok(Arc::new(arrow_array::StringArray::from(array)))
223 }
224
225 #[inline]
227 fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228 Ok(Arc::new(arrow_array::StringArray::from(array)))
229 }
230
231 #[inline]
232 fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233 Ok(Arc::new(arrow_array::Int64Array::from(array)))
234 }
235
236 #[inline]
237 fn list_to_arrow(
238 &self,
239 data_type: &arrow_schema::DataType,
240 array: &ListArray,
241 ) -> Result<arrow_array::ArrayRef, ArrayError> {
242 let arrow_schema::DataType::List(field) = data_type else {
243 return Err(ArrayError::to_arrow("Invalid list type"));
244 };
245 let values = self.to_array(field.data_type(), array.values())?;
246 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248 Ok(Arc::new(arrow_array::ListArray::new(
249 field.clone(),
250 offsets,
251 values,
252 nulls,
253 )))
254 }
255
256 #[inline]
257 fn vector_to_arrow(
258 &self,
259 data_type: &arrow_schema::DataType,
260 array: &VectorArray,
261 ) -> Result<arrow_array::ArrayRef, ArrayError> {
262 let arrow_schema::DataType::List(field) = data_type else {
263 return Err(ArrayError::to_arrow("Invalid list type"));
264 };
265 if field.data_type() != &arrow_schema::DataType::Float32 {
266 return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267 }
268 let values = Arc::new(arrow_array::Float32Array::from(
269 array.as_raw_slice().to_vec(),
270 ));
271 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273 Ok(Arc::new(arrow_array::ListArray::new(
274 field.clone(),
275 offsets,
276 values,
277 nulls,
278 )))
279 }
280
281 #[inline]
282 fn struct_to_arrow(
283 &self,
284 data_type: &arrow_schema::DataType,
285 array: &StructArray,
286 ) -> Result<arrow_array::ArrayRef, ArrayError> {
287 let arrow_schema::DataType::Struct(fields) = data_type else {
288 return Err(ArrayError::to_arrow("Invalid struct type"));
289 };
290 Ok(Arc::new(arrow_array::StructArray::new(
291 fields.clone(),
292 array
293 .fields()
294 .zip_eq_fast(fields)
295 .map(|(arr, field)| self.to_array(field.data_type(), arr))
296 .try_collect::<_, _, ArrayError>()?,
297 Some(array.null_bitmap().into()),
298 )))
299 }
300
301 #[inline]
302 fn map_to_arrow(
303 &self,
304 data_type: &arrow_schema::DataType,
305 array: &MapArray,
306 ) -> Result<arrow_array::ArrayRef, ArrayError> {
307 let arrow_schema::DataType::Map(field, ordered) = data_type else {
308 return Err(ArrayError::to_arrow("Invalid map type"));
309 };
310 if *ordered {
311 return Err(ArrayError::to_arrow("Sorted map is not supported"));
312 }
313 let values = self
314 .struct_to_arrow(field.data_type(), array.as_struct())?
315 .as_struct()
316 .clone();
317 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
318 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
319 Ok(Arc::new(arrow_array::MapArray::new(
320 field.clone(),
321 offsets,
322 values,
323 nulls,
324 *ordered,
325 )))
326 }
327
328 fn to_arrow_field(
333 &self,
334 name: &str,
335 value: &DataType,
336 ) -> Result<arrow_schema::Field, ArrayError> {
337 let data_type = match value {
338 DataType::Boolean => self.bool_type_to_arrow(),
340 DataType::Int16 => self.int16_type_to_arrow(),
341 DataType::Int32 => self.int32_type_to_arrow(),
342 DataType::Int64 => self.int64_type_to_arrow(),
343 DataType::Int256 => self.int256_type_to_arrow(),
344 DataType::Float32 => self.float32_type_to_arrow(),
345 DataType::Float64 => self.float64_type_to_arrow(),
346 DataType::Date => self.date_type_to_arrow(),
347 DataType::Time => self.time_type_to_arrow(),
348 DataType::Timestamp => self.timestamp_type_to_arrow(),
349 DataType::Timestamptz => self.timestamptz_type_to_arrow(),
350 DataType::Interval => self.interval_type_to_arrow(),
351 DataType::Varchar => self.varchar_type_to_arrow(),
352 DataType::Bytea => self.bytea_type_to_arrow(),
353 DataType::Serial => self.serial_type_to_arrow(),
354 DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
355 DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
356 DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
357 DataType::List(list) => self.list_type_to_arrow(list)?,
358 DataType::Map(map) => self.map_type_to_arrow(map)?,
359 DataType::Vector(_) => self.vector_type_to_arrow()?,
360 };
361 Ok(arrow_schema::Field::new(name, data_type, true))
362 }
363
364 #[inline]
365 fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
366 arrow_schema::DataType::Boolean
367 }
368
369 #[inline]
370 fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
371 arrow_schema::DataType::Int16
372 }
373
374 #[inline]
375 fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
376 arrow_schema::DataType::Int32
377 }
378
379 #[inline]
380 fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
381 arrow_schema::DataType::Int64
382 }
383
384 #[inline]
385 fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
386 arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
387 }
388
389 #[inline]
390 fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
391 arrow_schema::DataType::Float32
392 }
393
394 #[inline]
395 fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
396 arrow_schema::DataType::Float64
397 }
398
399 #[inline]
400 fn date_type_to_arrow(&self) -> arrow_schema::DataType {
401 arrow_schema::DataType::Date32
402 }
403
404 #[inline]
405 fn time_type_to_arrow(&self) -> arrow_schema::DataType {
406 arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
407 }
408
409 #[inline]
410 fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
411 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
412 }
413
414 #[inline]
415 fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
416 arrow_schema::DataType::Timestamp(
417 arrow_schema::TimeUnit::Microsecond,
418 Some("+00:00".into()),
419 )
420 }
421
422 #[inline]
423 fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
424 arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
425 }
426
427 #[inline]
428 fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
429 arrow_schema::DataType::Utf8
430 }
431
432 #[inline]
433 fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
434 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
435 .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
436 }
437
438 #[inline]
439 fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
440 arrow_schema::DataType::Binary
441 }
442
443 #[inline]
444 fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
445 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
446 .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
447 }
448
449 #[inline]
450 fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
451 arrow_schema::DataType::Int64
452 }
453
454 #[inline]
455 fn list_type_to_arrow(
456 &self,
457 list_type: &ListType,
458 ) -> Result<arrow_schema::DataType, ArrayError> {
459 Ok(arrow_schema::DataType::List(Arc::new(
460 self.to_arrow_field("item", list_type.elem())?,
461 )))
462 }
463
464 #[inline]
465 fn struct_type_to_arrow(
466 &self,
467 fields: &StructType,
468 ) -> Result<arrow_schema::DataType, ArrayError> {
469 Ok(arrow_schema::DataType::Struct(
470 fields
471 .iter()
472 .map(|(name, ty)| self.to_arrow_field(name, ty))
473 .try_collect::<_, _, ArrayError>()?,
474 ))
475 }
476
477 #[inline]
478 fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
479 let sorted = false;
480 let key = self
482 .to_arrow_field("key", map_type.key())?
483 .with_nullable(false);
484 let value = self.to_arrow_field("value", map_type.value())?;
485 Ok(arrow_schema::DataType::Map(
486 Arc::new(arrow_schema::Field::new(
487 "entries",
488 arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
489 false,
491 )),
492 sorted,
493 ))
494 }
495
496 #[inline]
497 fn vector_type_to_arrow(&self) -> Result<arrow_schema::DataType, ArrayError> {
498 Ok(arrow_schema::DataType::List(Arc::new(
499 self.to_arrow_field("item", &VECTOR_ITEM_TYPE)?,
500 )))
501 }
502}
503
504#[allow(clippy::wrong_self_convention)]
506pub trait FromArrow {
507 fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
509 let mut columns = Vec::with_capacity(batch.num_columns());
510 for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
511 let column = Arc::new(self.from_array(field, array)?);
512 columns.push(column);
513 }
514 Ok(DataChunk::new(columns, batch.num_rows()))
515 }
516
517 fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
519 Ok(StructType::new(
520 fields
521 .iter()
522 .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
523 .try_collect::<_, Vec<_>, ArrayError>()?,
524 ))
525 }
526
527 fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
529 use arrow_schema::DataType::*;
530 use arrow_schema::IntervalUnit::*;
531 use arrow_schema::TimeUnit::*;
532
533 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
535 return self.from_extension_type(type_name, field.data_type());
536 }
537
538 Ok(match field.data_type() {
539 Boolean => DataType::Boolean,
540 Int16 => DataType::Int16,
541 Int32 => DataType::Int32,
542 Int64 => DataType::Int64,
543 Int8 => DataType::Int16,
544 UInt8 => DataType::Int16,
545 UInt16 => DataType::Int32,
546 UInt32 => DataType::Int64,
547 UInt64 => DataType::Decimal,
548 Float16 => DataType::Float32,
549 Float32 => DataType::Float32,
550 Float64 => DataType::Float64,
551 Decimal128(_, _) => DataType::Decimal,
552 Decimal256(_, _) => DataType::Int256,
553 Date32 => DataType::Date,
554 Time64(Microsecond) => DataType::Time,
555 Timestamp(Microsecond, None) => DataType::Timestamp,
556 Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
557 Timestamp(Second, None) => DataType::Timestamp,
558 Timestamp(Second, Some(_)) => DataType::Timestamptz,
559 Timestamp(Millisecond, None) => DataType::Timestamp,
560 Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
561 Timestamp(Nanosecond, None) => DataType::Timestamp,
562 Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
563 Interval(MonthDayNano) => DataType::Interval,
564 Utf8 => DataType::Varchar,
565 Binary => DataType::Bytea,
566 LargeUtf8 => self.from_large_utf8()?,
567 LargeBinary => self.from_large_binary()?,
568 List(field) => DataType::list(self.from_field(field)?),
569 Struct(fields) => DataType::Struct(self.from_fields(fields)?),
570 Map(field, _is_sorted) => {
571 let entries = self.from_field(field)?;
572 DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
573 ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
574 })?)
575 }
576 t => {
577 return Err(ArrayError::from_arrow(format!(
578 "unsupported arrow data type: {t:?}"
579 )));
580 }
581 })
582 }
583
584 fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
586 Ok(DataType::Varchar)
587 }
588
589 fn from_large_binary(&self) -> Result<DataType, ArrayError> {
591 Ok(DataType::Bytea)
592 }
593
594 fn from_extension_type(
596 &self,
597 type_name: &str,
598 physical_type: &arrow_schema::DataType,
599 ) -> Result<DataType, ArrayError> {
600 match (type_name, physical_type) {
601 ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
602 ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
603 _ => Err(ArrayError::from_arrow(format!(
604 "unsupported extension type: {type_name:?}"
605 ))),
606 }
607 }
608
609 fn from_array(
611 &self,
612 field: &arrow_schema::Field,
613 array: &arrow_array::ArrayRef,
614 ) -> Result<ArrayImpl, ArrayError> {
615 use arrow_schema::DataType::*;
616 use arrow_schema::IntervalUnit::*;
617 use arrow_schema::TimeUnit::*;
618
619 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
621 return self.from_extension_array(type_name, array);
622 }
623 match array.data_type() {
624 Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
625 Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
626 Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
627 Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
628 Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
629 UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
630 UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
631 UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
632
633 UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
634 Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
635 Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
636 Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
637 Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
638 Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
639 Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
640 Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
641 Timestamp(Second, None) => {
642 self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
643 }
644 Timestamp(Second, Some(_)) => {
645 self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
646 }
647 Timestamp(Millisecond, None) => {
648 self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
649 }
650 Timestamp(Millisecond, Some(_)) => {
651 self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
652 }
653 Timestamp(Microsecond, None) => {
654 self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
655 }
656 Timestamp(Microsecond, Some(_)) => {
657 self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
658 }
659 Timestamp(Nanosecond, None) => {
660 self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
661 }
662 Timestamp(Nanosecond, Some(_)) => {
663 self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
664 }
665 Interval(MonthDayNano) => {
666 self.from_interval_array(array.as_any().downcast_ref().unwrap())
667 }
668 Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
669 Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
670 LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
671 LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
672 List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
673 Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
674 Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
675 t => Err(ArrayError::from_arrow(format!(
676 "unsupported arrow data type: {t:?}",
677 ))),
678 }
679 }
680
681 fn from_extension_array(
683 &self,
684 type_name: &str,
685 array: &arrow_array::ArrayRef,
686 ) -> Result<ArrayImpl, ArrayError> {
687 match type_name {
688 "arrowudf.decimal" => {
689 let array: &arrow_array::StringArray =
690 array.as_any().downcast_ref().ok_or_else(|| {
691 ArrayError::from_arrow(
692 "expected string array for `arrowudf.decimal`".to_owned(),
693 )
694 })?;
695 Ok(ArrayImpl::Decimal(array.try_into()?))
696 }
697 "arrowudf.json" => {
698 let array: &arrow_array::StringArray =
699 array.as_any().downcast_ref().ok_or_else(|| {
700 ArrayError::from_arrow(
701 "expected string array for `arrowudf.json`".to_owned(),
702 )
703 })?;
704 Ok(ArrayImpl::Jsonb(array.try_into()?))
705 }
706 _ => Err(ArrayError::from_arrow(format!(
707 "unsupported extension type: {type_name:?}"
708 ))),
709 }
710 }
711
712 fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
713 Ok(ArrayImpl::Bool(array.into()))
714 }
715
716 fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
717 Ok(ArrayImpl::Int16(array.into()))
718 }
719
720 fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
721 Ok(ArrayImpl::Int16(array.into()))
722 }
723
724 fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
725 Ok(ArrayImpl::Int16(array.into()))
726 }
727
728 fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
729 Ok(ArrayImpl::Int32(array.into()))
730 }
731
732 fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
733 Ok(ArrayImpl::Int64(array.into()))
734 }
735
736 fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
737 Ok(ArrayImpl::Int32(array.into()))
738 }
739
740 fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
741 Ok(ArrayImpl::Int64(array.into()))
742 }
743
744 fn from_int256_array(
745 &self,
746 array: &arrow_array::Decimal256Array,
747 ) -> Result<ArrayImpl, ArrayError> {
748 Ok(ArrayImpl::Int256(array.into()))
749 }
750
751 fn from_decimal128_array(
752 &self,
753 array: &arrow_array::Decimal128Array,
754 ) -> Result<ArrayImpl, ArrayError> {
755 Ok(ArrayImpl::Decimal(array.try_into()?))
756 }
757
758 fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
759 Ok(ArrayImpl::Decimal(array.try_into()?))
760 }
761
762 fn from_float16_array(
763 &self,
764 array: &arrow_array::Float16Array,
765 ) -> Result<ArrayImpl, ArrayError> {
766 Ok(ArrayImpl::Float32(array.try_into()?))
767 }
768
769 fn from_float32_array(
770 &self,
771 array: &arrow_array::Float32Array,
772 ) -> Result<ArrayImpl, ArrayError> {
773 Ok(ArrayImpl::Float32(array.into()))
774 }
775
776 fn from_float64_array(
777 &self,
778 array: &arrow_array::Float64Array,
779 ) -> Result<ArrayImpl, ArrayError> {
780 Ok(ArrayImpl::Float64(array.into()))
781 }
782
783 fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
784 Ok(ArrayImpl::Date(array.into()))
785 }
786
787 fn from_time64us_array(
788 &self,
789 array: &arrow_array::Time64MicrosecondArray,
790 ) -> Result<ArrayImpl, ArrayError> {
791 Ok(ArrayImpl::Time(array.into()))
792 }
793
794 fn from_timestampsecond_array(
795 &self,
796 array: &arrow_array::TimestampSecondArray,
797 ) -> Result<ArrayImpl, ArrayError> {
798 Ok(ArrayImpl::Timestamp(array.into()))
799 }
800 fn from_timestampsecond_some_array(
801 &self,
802 array: &arrow_array::TimestampSecondArray,
803 ) -> Result<ArrayImpl, ArrayError> {
804 Ok(ArrayImpl::Timestamptz(array.into()))
805 }
806
807 fn from_timestampms_array(
808 &self,
809 array: &arrow_array::TimestampMillisecondArray,
810 ) -> Result<ArrayImpl, ArrayError> {
811 Ok(ArrayImpl::Timestamp(array.into()))
812 }
813
814 fn from_timestampms_some_array(
815 &self,
816 array: &arrow_array::TimestampMillisecondArray,
817 ) -> Result<ArrayImpl, ArrayError> {
818 Ok(ArrayImpl::Timestamptz(array.into()))
819 }
820
821 fn from_timestampus_array(
822 &self,
823 array: &arrow_array::TimestampMicrosecondArray,
824 ) -> Result<ArrayImpl, ArrayError> {
825 Ok(ArrayImpl::Timestamp(array.into()))
826 }
827
828 fn from_timestampus_some_array(
829 &self,
830 array: &arrow_array::TimestampMicrosecondArray,
831 ) -> Result<ArrayImpl, ArrayError> {
832 Ok(ArrayImpl::Timestamptz(array.into()))
833 }
834
835 fn from_timestampns_array(
836 &self,
837 array: &arrow_array::TimestampNanosecondArray,
838 ) -> Result<ArrayImpl, ArrayError> {
839 Ok(ArrayImpl::Timestamp(array.into()))
840 }
841
842 fn from_timestampns_some_array(
843 &self,
844 array: &arrow_array::TimestampNanosecondArray,
845 ) -> Result<ArrayImpl, ArrayError> {
846 Ok(ArrayImpl::Timestamptz(array.into()))
847 }
848
849 fn from_interval_array(
850 &self,
851 array: &arrow_array::IntervalMonthDayNanoArray,
852 ) -> Result<ArrayImpl, ArrayError> {
853 Ok(ArrayImpl::Interval(array.into()))
854 }
855
856 fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
857 Ok(ArrayImpl::Utf8(array.into()))
858 }
859
860 fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
861 Ok(ArrayImpl::Bytea(array.into()))
862 }
863
864 fn from_large_utf8_array(
865 &self,
866 array: &arrow_array::LargeStringArray,
867 ) -> Result<ArrayImpl, ArrayError> {
868 Ok(ArrayImpl::Utf8(array.into()))
869 }
870
871 fn from_large_binary_array(
872 &self,
873 array: &arrow_array::LargeBinaryArray,
874 ) -> Result<ArrayImpl, ArrayError> {
875 Ok(ArrayImpl::Bytea(array.into()))
876 }
877
878 fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
879 use arrow_array::Array;
880 let arrow_schema::DataType::List(field) = array.data_type() else {
881 panic!("nested field types cannot be determined.");
882 };
883 Ok(ArrayImpl::List(ListArray {
884 value: Box::new(self.from_array(field, array.values())?),
885 bitmap: match array.nulls() {
886 Some(nulls) => nulls.iter().collect(),
887 None => Bitmap::ones(array.len()),
888 },
889 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
890 }))
891 }
892
893 fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
894 use arrow_array::Array;
895 let arrow_schema::DataType::Struct(fields) = array.data_type() else {
896 panic!("nested field types cannot be determined.");
897 };
898 Ok(ArrayImpl::Struct(StructArray::new(
899 self.from_fields(fields)?,
900 array
901 .columns()
902 .iter()
903 .zip_eq_fast(fields)
904 .map(|(array, field)| self.from_array(field, array).map(Arc::new))
905 .try_collect()?,
906 (0..array.len()).map(|i| array.is_valid(i)).collect(),
907 )))
908 }
909
910 fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
911 use arrow_array::Array;
912 let struct_array = self.from_struct_array(array.entries())?;
913 let list_array = ListArray {
914 value: Box::new(struct_array),
915 bitmap: match array.nulls() {
916 Some(nulls) => nulls.iter().collect(),
917 None => Bitmap::ones(array.len()),
918 },
919 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
920 };
921
922 Ok(ArrayImpl::Map(MapArray { inner: list_array }))
923 }
924}
925
926impl From<&Bitmap> for arrow_buffer::NullBuffer {
927 fn from(bitmap: &Bitmap) -> Self {
928 bitmap.iter().collect()
929 }
930}
931
932macro_rules! converts {
934 ($ArrayType:ty, $ArrowType:ty) => {
935 impl From<&$ArrayType> for $ArrowType {
936 fn from(array: &$ArrayType) -> Self {
937 array.iter().collect()
938 }
939 }
940 impl From<&$ArrowType> for $ArrayType {
941 fn from(array: &$ArrowType) -> Self {
942 array.iter().collect()
943 }
944 }
945 impl From<&[$ArrowType]> for $ArrayType {
946 fn from(arrays: &[$ArrowType]) -> Self {
947 arrays.iter().flat_map(|a| a.iter()).collect()
948 }
949 }
950 };
951 ($ArrayType:ty, $ArrowType:ty, @map) => {
953 impl From<&$ArrayType> for $ArrowType {
954 fn from(array: &$ArrayType) -> Self {
955 array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
956 }
957 }
958 impl From<&$ArrowType> for $ArrayType {
959 fn from(array: &$ArrowType) -> Self {
960 array
961 .iter()
962 .map(|o| {
963 o.map(|v| {
964 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
965 })
966 })
967 .collect()
968 }
969 }
970 impl From<&[$ArrowType]> for $ArrayType {
971 fn from(arrays: &[$ArrowType]) -> Self {
972 arrays
973 .iter()
974 .flat_map(|a| a.iter())
975 .map(|o| {
976 o.map(|v| {
977 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
978 })
979 })
980 .collect()
981 }
982 }
983 };
984}
985
986macro_rules! converts_with_type {
988 ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
989 impl From<&$ArrayType> for $ArrowType {
990 fn from(array: &$ArrayType) -> Self {
991 let values: Vec<Option<$ToType>> =
992 array.iter().map(|x| x.map(|v| v as $ToType)).collect();
993 <$ArrowType>::from_iter(values)
994 }
995 }
996
997 impl From<&$ArrowType> for $ArrayType {
998 fn from(array: &$ArrowType) -> Self {
999 let values: Vec<Option<$FromType>> =
1000 array.iter().map(|x| x.map(|v| v as $FromType)).collect();
1001 <$ArrayType>::from_iter(values)
1002 }
1003 }
1004
1005 impl From<&[$ArrowType]> for $ArrayType {
1006 fn from(arrays: &[$ArrowType]) -> Self {
1007 let values: Vec<Option<$FromType>> = arrays
1008 .iter()
1009 .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1010 .collect();
1011 <$ArrayType>::from_iter(values)
1012 }
1013 }
1014 };
1015}
1016
1017macro_rules! converts_with_timeunit {
1018 ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1019
1020 impl From<&$ArrayType> for $ArrowType {
1021 fn from(array: &$ArrayType) -> Self {
1022 array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1023 }
1024 }
1025
1026 impl From<&$ArrowType> for $ArrayType {
1027 fn from(array: &$ArrowType) -> Self {
1028 array.iter().map(|o| {
1029 o.map(|v| {
1030 let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1031 timestamp
1032 })
1033 }).collect()
1034 }
1035 }
1036
1037 impl From<&[$ArrowType]> for $ArrayType {
1038 fn from(arrays: &[$ArrowType]) -> Self {
1039 arrays
1040 .iter()
1041 .flat_map(|a| a.iter())
1042 .map(|o| {
1043 o.map(|v| {
1044 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1045 })
1046 })
1047 .collect()
1048 }
1049 }
1050
1051 };
1052}
1053
1054converts!(BoolArray, arrow_array::BooleanArray);
1055converts!(I16Array, arrow_array::Int16Array);
1056converts!(I32Array, arrow_array::Int32Array);
1057converts!(I64Array, arrow_array::Int64Array);
1058converts!(F32Array, arrow_array::Float32Array, @map);
1059converts!(F64Array, arrow_array::Float64Array, @map);
1060converts!(BytesArray, arrow_array::BinaryArray);
1061converts!(BytesArray, arrow_array::LargeBinaryArray);
1062converts!(Utf8Array, arrow_array::StringArray);
1063converts!(Utf8Array, arrow_array::LargeStringArray);
1064converts!(DateArray, arrow_array::Date32Array, @map);
1065converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1066converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1067converts!(SerialArray, arrow_array::Int64Array, @map);
1068
1069converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1070converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1071converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1072converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1073
1074converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1075converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1076converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1077converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1078
1079converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1080converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1081converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1082converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1083
1084trait FromIntoArrow {
1086 type ArrowType;
1088 fn from_arrow(value: Self::ArrowType) -> Self;
1089 fn into_arrow(self) -> Self::ArrowType;
1090}
1091
1092trait FromIntoArrowWithUnit {
1095 type ArrowType;
1096 type TimestampType;
1098 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1099 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1100}
1101
1102impl FromIntoArrow for Serial {
1103 type ArrowType = i64;
1104
1105 fn from_arrow(value: Self::ArrowType) -> Self {
1106 value.into()
1107 }
1108
1109 fn into_arrow(self) -> Self::ArrowType {
1110 self.into()
1111 }
1112}
1113
1114impl FromIntoArrow for F32 {
1115 type ArrowType = f32;
1116
1117 fn from_arrow(value: Self::ArrowType) -> Self {
1118 value.into()
1119 }
1120
1121 fn into_arrow(self) -> Self::ArrowType {
1122 self.into()
1123 }
1124}
1125
1126impl FromIntoArrow for F64 {
1127 type ArrowType = f64;
1128
1129 fn from_arrow(value: Self::ArrowType) -> Self {
1130 value.into()
1131 }
1132
1133 fn into_arrow(self) -> Self::ArrowType {
1134 self.into()
1135 }
1136}
1137
1138impl FromIntoArrow for Date {
1139 type ArrowType = i32;
1140
1141 fn from_arrow(value: Self::ArrowType) -> Self {
1142 Date(arrow_array::types::Date32Type::to_naive_date(value))
1143 }
1144
1145 fn into_arrow(self) -> Self::ArrowType {
1146 arrow_array::types::Date32Type::from_naive_date(self.0)
1147 }
1148}
1149
1150impl FromIntoArrow for Time {
1151 type ArrowType = i64;
1152
1153 fn from_arrow(value: Self::ArrowType) -> Self {
1154 Time(
1155 NaiveTime::from_num_seconds_from_midnight_opt(
1156 (value / 1_000_000) as _,
1157 (value % 1_000_000 * 1000) as _,
1158 )
1159 .unwrap(),
1160 )
1161 }
1162
1163 fn into_arrow(self) -> Self::ArrowType {
1164 self.0
1165 .signed_duration_since(NaiveTime::default())
1166 .num_microseconds()
1167 .unwrap()
1168 }
1169}
1170
1171impl FromIntoArrowWithUnit for Timestamp {
1172 type ArrowType = i64;
1173 type TimestampType = TimeUnit;
1174
1175 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1176 match time_unit {
1177 TimeUnit::Second => {
1178 Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1179 }
1180 TimeUnit::Millisecond => {
1181 Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1182 }
1183 TimeUnit::Microsecond => {
1184 Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1185 }
1186 TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1187 }
1188 }
1189
1190 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1191 match time_unit {
1192 TimeUnit::Second => self.0.and_utc().timestamp(),
1193 TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1194 TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1195 TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1196 }
1197 }
1198}
1199
1200impl FromIntoArrowWithUnit for Timestamptz {
1201 type ArrowType = i64;
1202 type TimestampType = TimeUnit;
1203
1204 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1205 match time_unit {
1206 TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1207 TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1208 TimeUnit::Microsecond => Timestamptz::from_micros(value),
1209 TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1210 }
1211 }
1212
1213 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1214 match time_unit {
1215 TimeUnit::Second => self.timestamp(),
1216 TimeUnit::Millisecond => self.timestamp_millis(),
1217 TimeUnit::Microsecond => self.timestamp_micros(),
1218 TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1219 }
1220 }
1221}
1222
1223impl FromIntoArrow for Interval {
1224 type ArrowType = ArrowIntervalType;
1225
1226 fn from_arrow(value: Self::ArrowType) -> Self {
1227 Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1228 }
1229
1230 fn into_arrow(self) -> Self::ArrowType {
1231 ArrowIntervalType {
1232 months: self.months(),
1233 days: self.days(),
1234 nanoseconds: self.usecs() * 1000,
1236 }
1237 }
1238}
1239
1240impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1241 fn from(array: &DecimalArray) -> Self {
1242 let mut builder =
1243 arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1244 for value in array.iter() {
1245 builder.append_option(value.map(|d| d.to_string()));
1246 }
1247 builder.finish()
1248 }
1249}
1250
1251impl From<&DecimalArray> for arrow_array::StringArray {
1252 fn from(array: &DecimalArray) -> Self {
1253 let mut builder =
1254 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1255 for value in array.iter() {
1256 builder.append_option(value.map(|d| d.to_string()));
1257 }
1258 builder.finish()
1259 }
1260}
1261
1262impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1264 type Error = ArrayError;
1265
1266 fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1267 if array.scale() < 0 {
1268 bail!("support negative scale for arrow decimal")
1269 }
1270
1271 let precision = array.precision();
1274 let max_value = 10_i128.pow(precision as u32) - 1;
1275
1276 let from_arrow = |value| {
1277 const NAN: i128 = i128::MIN + 1;
1278 let res = match value {
1279 NAN => Decimal::NaN,
1281 v if v == max_value => Decimal::PositiveInf,
1282 v if v == -max_value => Decimal::NegativeInf,
1283 i128::MAX => Decimal::PositiveInf, i128::MIN => Decimal::NegativeInf, _ => Decimal::truncated_i128_and_scale(value, array.scale() as u32)
1286 .ok_or_else(|| ArrayError::from_arrow("decimal overflow"))?,
1287 };
1288 Ok(res)
1289 };
1290 array
1291 .iter()
1292 .map(|o| o.map(from_arrow).transpose())
1293 .collect::<Result<Self, Self::Error>>()
1294 }
1295}
1296
1297impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1299 type Error = ArrayError;
1300
1301 fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1302 let from_arrow = |value| {
1303 let res = Decimal::from(value);
1305 Ok(res)
1306 };
1307
1308 array
1310 .iter()
1311 .map(|o| o.map(from_arrow).transpose())
1312 .collect::<Result<Self, Self::Error>>()
1313 }
1314}
1315
1316impl TryFrom<&arrow_array::Float16Array> for F32Array {
1317 type Error = ArrayError;
1318
1319 fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1320 let from_arrow = |value| Ok(f32::from(value));
1321
1322 array
1323 .iter()
1324 .map(|o| o.map(from_arrow).transpose())
1325 .collect::<Result<Self, Self::Error>>()
1326 }
1327}
1328
1329impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1330 type Error = ArrayError;
1331
1332 fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1333 array
1334 .iter()
1335 .map(|o| {
1336 o.map(|s| {
1337 let s = std::str::from_utf8(s)
1338 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1339 s.parse()
1340 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1341 })
1342 .transpose()
1343 })
1344 .try_collect()
1345 }
1346}
1347
1348impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1349 type Error = ArrayError;
1350
1351 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1352 array
1353 .iter()
1354 .map(|o| {
1355 o.map(|s| {
1356 s.parse()
1357 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1358 })
1359 .transpose()
1360 })
1361 .try_collect()
1362 }
1363}
1364
1365impl From<&JsonbArray> for arrow_array::StringArray {
1366 fn from(array: &JsonbArray) -> Self {
1367 let mut builder =
1368 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1369 for value in array.iter() {
1370 match value {
1371 Some(jsonb) => {
1372 write!(&mut builder, "{}", jsonb).unwrap();
1373 builder.append_value("");
1374 }
1375 None => builder.append_null(),
1376 }
1377 }
1378 builder.finish()
1379 }
1380}
1381
1382impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1383 type Error = ArrayError;
1384
1385 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1386 array
1387 .iter()
1388 .map(|o| {
1389 o.map(|s| {
1390 s.parse()
1391 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1392 })
1393 .transpose()
1394 })
1395 .try_collect()
1396 }
1397}
1398
1399impl From<&IntervalArray> for arrow_array::StringArray {
1400 fn from(array: &IntervalArray) -> Self {
1401 let mut builder =
1402 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1403 for value in array.iter() {
1404 match value {
1405 Some(interval) => {
1406 write!(&mut builder, "{}", interval).unwrap();
1407 builder.append_value("");
1408 }
1409 None => builder.append_null(),
1410 }
1411 }
1412 builder.finish()
1413 }
1414}
1415
1416impl From<&JsonbArray> for arrow_array::LargeStringArray {
1417 fn from(array: &JsonbArray) -> Self {
1418 let mut builder =
1419 arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1420 for value in array.iter() {
1421 match value {
1422 Some(jsonb) => {
1423 write!(&mut builder, "{}", jsonb).unwrap();
1424 builder.append_value("");
1425 }
1426 None => builder.append_null(),
1427 }
1428 }
1429 builder.finish()
1430 }
1431}
1432
1433impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1434 type Error = ArrayError;
1435
1436 fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1437 array
1438 .iter()
1439 .map(|o| {
1440 o.map(|s| {
1441 s.parse()
1442 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1443 })
1444 .transpose()
1445 })
1446 .try_collect()
1447 }
1448}
1449
1450impl From<arrow_buffer::i256> for Int256 {
1451 fn from(value: arrow_buffer::i256) -> Self {
1452 let buffer = value.to_be_bytes();
1453 Int256::from_be_bytes(buffer)
1454 }
1455}
1456
1457impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1458 fn from(val: Int256Ref<'a>) -> Self {
1459 let buffer = val.to_be_bytes();
1460 arrow_buffer::i256::from_be_bytes(buffer)
1461 }
1462}
1463
1464impl From<&Int256Array> for arrow_array::Decimal256Array {
1465 fn from(array: &Int256Array) -> Self {
1466 array
1467 .iter()
1468 .map(|o| o.map(arrow_buffer::i256::from))
1469 .collect()
1470 }
1471}
1472
1473impl From<&arrow_array::Decimal256Array> for Int256Array {
1474 fn from(array: &arrow_array::Decimal256Array) -> Self {
1475 let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1476
1477 values
1478 .iter()
1479 .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1480 .collect()
1481 }
1482}
1483
1484pub fn is_parquet_schema_match_source_schema(
1500 arrow_data_type: &arrow_schema::DataType,
1501 rw_data_type: &crate::types::DataType,
1502) -> bool {
1503 use arrow_schema::DataType as ArrowType;
1504
1505 use crate::types::{DataType as RwType, MapType, StructType};
1506
1507 match (arrow_data_type, rw_data_type) {
1508 (ArrowType::Boolean, RwType::Boolean)
1510 | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1511 | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1512 | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1513 | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1514 | (ArrowType::Decimal256(_, _), RwType::Int256)
1515 | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1516 | (ArrowType::Float64, RwType::Float64)
1517 | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1518 | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1519 | (ArrowType::Date32, RwType::Date)
1520 | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1521 | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1522 | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1523 | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1524
1525 (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1528 if arrow_fields.len() != rw_struct.len() {
1529 return false;
1530 }
1531 for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1532 {
1533 if arrow_field.name() != rw_name {
1534 return false;
1535 }
1536 if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1537 return false;
1538 }
1539 }
1540 true
1541 }
1542 (ArrowType::List(arrow_field), RwType::List(rw_list_ty)) => {
1545 is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_list_ty.elem())
1546 }
1547 (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1551 if let ArrowType::Struct(fields) = arrow_field.data_type() {
1552 if fields.len() != 2 {
1553 return false;
1554 }
1555 let key_field = &fields[0];
1556 let value_field = &fields[1];
1557 if key_field.name() != "key" || value_field.name() != "value" {
1558 return false;
1559 }
1560 let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1561 is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1562 && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1563 } else {
1564 false
1565 }
1566 }
1567 _ => false,
1569 }
1570}
1571#[cfg(test)]
1572mod tests {
1573
1574 use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1575
1576 use super::*;
1577 use crate::types::{DataType as RwType, MapType, StructType};
1578
1579 #[test]
1580 fn test_struct_schema_match() {
1581 let arrow_struct = ArrowType::Struct(
1584 vec![
1585 ArrowField::new("f1", ArrowType::Float64, true),
1586 ArrowField::new("f2", ArrowType::Utf8, true),
1587 ]
1588 .into(),
1589 );
1590 let rw_struct = RwType::Struct(StructType::new(vec![
1592 ("f1".to_owned(), RwType::Float64),
1593 ("f2".to_owned(), RwType::Varchar),
1594 ]));
1595 assert!(is_parquet_schema_match_source_schema(
1596 &arrow_struct,
1597 &rw_struct
1598 ));
1599
1600 let arrow_struct2 = ArrowType::Struct(
1602 vec![
1603 ArrowField::new("f1", ArrowType::Float64, true),
1604 ArrowField::new("f3", ArrowType::Utf8, true),
1605 ]
1606 .into(),
1607 );
1608 assert!(!is_parquet_schema_match_source_schema(
1609 &arrow_struct2,
1610 &rw_struct
1611 ));
1612 }
1613
1614 #[test]
1615 fn test_list_schema_match() {
1616 let arrow_list =
1618 ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1619 let rw_list = RwType::Float64.list();
1621 assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1622
1623 let rw_list2 = RwType::Int32.list();
1624 assert!(!is_parquet_schema_match_source_schema(
1625 &arrow_list,
1626 &rw_list2
1627 ));
1628 }
1629
1630 #[test]
1631 fn test_map_schema_match() {
1632 let arrow_map = ArrowType::Map(
1634 Arc::new(ArrowField::new(
1635 "entries",
1636 ArrowType::Struct(
1637 vec![
1638 ArrowField::new("key", ArrowType::Utf8, false),
1639 ArrowField::new("value", ArrowType::Int32, true),
1640 ]
1641 .into(),
1642 ),
1643 false,
1644 )),
1645 false,
1646 );
1647 let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1649 assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1650
1651 let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1653 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1654
1655 let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1657 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1658
1659 let arrow_map2 = ArrowType::Map(
1661 Arc::new(ArrowField::new(
1662 "entries",
1663 ArrowType::Struct(
1664 vec![
1665 ArrowField::new("k", ArrowType::Utf8, false),
1666 ArrowField::new("value", ArrowType::Int32, true),
1667 ]
1668 .into(),
1669 ),
1670 false,
1671 )),
1672 false,
1673 );
1674 assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1675 }
1676
1677 #[test]
1678 fn bool() {
1679 let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1680 let arrow = arrow_array::BooleanArray::from(&array);
1681 assert_eq!(BoolArray::from(&arrow), array);
1682 }
1683
1684 #[test]
1685 fn i16() {
1686 let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1687 let arrow = arrow_array::Int16Array::from(&array);
1688 assert_eq!(I16Array::from(&arrow), array);
1689 }
1690
1691 #[test]
1692 fn i32() {
1693 let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1694 let arrow = arrow_array::Int32Array::from(&array);
1695 assert_eq!(I32Array::from(&arrow), array);
1696 }
1697
1698 #[test]
1699 fn i64() {
1700 let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1701 let arrow = arrow_array::Int64Array::from(&array);
1702 assert_eq!(I64Array::from(&arrow), array);
1703 }
1704
1705 #[test]
1706 fn f32() {
1707 let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1708 let arrow = arrow_array::Float32Array::from(&array);
1709 assert_eq!(F32Array::from(&arrow), array);
1710 }
1711
1712 #[test]
1713 fn f64() {
1714 let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1715 let arrow = arrow_array::Float64Array::from(&array);
1716 assert_eq!(F64Array::from(&arrow), array);
1717 }
1718
1719 #[test]
1720 fn int8() {
1721 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1722 let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1723 let converted: PrimitiveArray<i16> = (&arr).into();
1724 assert_eq!(converted, array);
1725 }
1726
1727 #[test]
1728 fn uint8() {
1729 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1730 let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1731 let converted: PrimitiveArray<i16> = (&arr).into();
1732 assert_eq!(converted, array);
1733 }
1734
1735 #[test]
1736 fn uint16() {
1737 let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1738 let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1739 let converted: PrimitiveArray<i32> = (&arr).into();
1740 assert_eq!(converted, array);
1741 }
1742
1743 #[test]
1744 fn uint32() {
1745 let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1746 let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1747 let converted: PrimitiveArray<i64> = (&arr).into();
1748 assert_eq!(converted, array);
1749 }
1750
1751 #[test]
1752 fn uint64() {
1753 let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1754 None,
1755 Some(Decimal::Normalized("7".parse().unwrap())),
1756 Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1757 ]);
1758 let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1759 let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1760 assert_eq!(converted, array);
1761 }
1762
1763 #[test]
1764 fn date() {
1765 let array = DateArray::from_iter([
1766 None,
1767 Date::with_days_since_ce(12345).ok(),
1768 Date::with_days_since_ce(-12345).ok(),
1769 ]);
1770 let arrow = arrow_array::Date32Array::from(&array);
1771 assert_eq!(DateArray::from(&arrow), array);
1772 }
1773
1774 #[test]
1775 fn time() {
1776 let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1777 let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1778 assert_eq!(TimeArray::from(&arrow), array);
1779 }
1780
1781 #[test]
1782 fn timestamp() {
1783 let array =
1784 TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1785 let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1786 assert_eq!(TimestampArray::from(&arrow), array);
1787 }
1788
1789 #[test]
1790 fn interval() {
1791 let array = IntervalArray::from_iter([
1792 None,
1793 Some(Interval::from_month_day_usec(
1794 1_000_000,
1795 1_000,
1796 1_000_000_000,
1797 )),
1798 Some(Interval::from_month_day_usec(
1799 -1_000_000,
1800 -1_000,
1801 -1_000_000_000,
1802 )),
1803 ]);
1804 let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1805 assert_eq!(IntervalArray::from(&arrow), array);
1806 }
1807
1808 #[test]
1809 fn string() {
1810 let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1811 let arrow = arrow_array::StringArray::from(&array);
1812 assert_eq!(Utf8Array::from(&arrow), array);
1813 }
1814
1815 #[test]
1816 fn binary() {
1817 let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1818 let arrow = arrow_array::BinaryArray::from(&array);
1819 assert_eq!(BytesArray::from(&arrow), array);
1820 }
1821
1822 #[test]
1823 fn decimal() {
1824 let array = DecimalArray::from_iter([
1825 None,
1826 Some(Decimal::NaN),
1827 Some(Decimal::PositiveInf),
1828 Some(Decimal::NegativeInf),
1829 Some(Decimal::Normalized("123.4".parse().unwrap())),
1830 Some(Decimal::Normalized("123.456".parse().unwrap())),
1831 ]);
1832 let arrow = arrow_array::LargeBinaryArray::from(&array);
1833 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1834
1835 let arrow = arrow_array::StringArray::from(&array);
1836 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1837 }
1838
1839 #[test]
1840 fn jsonb() {
1841 let array = JsonbArray::from_iter([
1842 None,
1843 Some("null".parse().unwrap()),
1844 Some("false".parse().unwrap()),
1845 Some("1".parse().unwrap()),
1846 Some("[1, 2, 3]".parse().unwrap()),
1847 Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1848 ]);
1849 let arrow = arrow_array::LargeStringArray::from(&array);
1850 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1851
1852 let arrow = arrow_array::StringArray::from(&array);
1853 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1854 }
1855
1856 #[test]
1857 fn int256() {
1858 let values = [
1859 None,
1860 Some(Int256::from(1)),
1861 Some(Int256::from(i64::MAX)),
1862 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1863 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1864 Some(
1865 Int256::from(i64::MAX)
1866 * Int256::from(i64::MAX)
1867 * Int256::from(i64::MAX)
1868 * Int256::from(i64::MAX),
1869 ),
1870 Some(Int256::min_value()),
1871 Some(Int256::max_value()),
1872 ];
1873
1874 let array =
1875 Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1876 let arrow = arrow_array::Decimal256Array::from(&array);
1877 assert_eq!(Int256Array::from(&arrow), array);
1878 }
1879}