1#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55use crate::array::*;
57use crate::types::{DataType as RwDataType, *};
58use crate::util::iter_util::ZipEqFast;
59
60pub trait ToArrow {
65 fn to_record_batch(
69 &self,
70 schema: arrow_schema::SchemaRef,
71 chunk: &DataChunk,
72 ) -> Result<arrow_array::RecordBatch, ArrayError> {
73 if !chunk.is_compacted() {
75 let c = chunk.clone();
76 return self.to_record_batch(schema, &c.compact());
77 }
78
79 let columns: Vec<_> = chunk
81 .columns()
82 .iter()
83 .zip_eq_fast(schema.fields().iter())
84 .map(|(column, field)| self.to_array(field.data_type(), column))
85 .try_collect()?;
86
87 let opts =
89 arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90 arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91 .map_err(ArrayError::to_arrow)
92 }
93
94 fn to_array(
96 &self,
97 data_type: &arrow_schema::DataType,
98 array: &ArrayImpl,
99 ) -> Result<arrow_array::ArrayRef, ArrayError> {
100 let arrow_array = match array {
101 ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102 ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103 ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104 ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105 ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106 ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107 ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108 ArrayImpl::Date(array) => self.date_to_arrow(array),
109 ArrayImpl::Time(array) => self.time_to_arrow(array),
110 ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111 ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112 ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113 ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114 ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115 ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116 ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117 ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118 ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119 ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120 ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121 ArrayImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
122 }?;
123 if arrow_array.data_type() != data_type {
124 arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125 } else {
126 Ok(arrow_array)
127 }
128 }
129
130 #[inline]
131 fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132 Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133 }
134
135 #[inline]
136 fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137 Ok(Arc::new(arrow_array::Int16Array::from(array)))
138 }
139
140 #[inline]
141 fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142 Ok(Arc::new(arrow_array::Int32Array::from(array)))
143 }
144
145 #[inline]
146 fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147 Ok(Arc::new(arrow_array::Int64Array::from(array)))
148 }
149
150 #[inline]
151 fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152 Ok(Arc::new(arrow_array::Float32Array::from(array)))
153 }
154
155 #[inline]
156 fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157 Ok(Arc::new(arrow_array::Float64Array::from(array)))
158 }
159
160 #[inline]
161 fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162 Ok(Arc::new(arrow_array::StringArray::from(array)))
163 }
164
165 #[inline]
166 fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167 Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168 }
169
170 #[inline]
171 fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172 Ok(Arc::new(arrow_array::Date32Array::from(array)))
173 }
174
175 #[inline]
176 fn timestamp_to_arrow(
177 &self,
178 array: &TimestampArray,
179 ) -> Result<arrow_array::ArrayRef, ArrayError> {
180 Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181 array,
182 )))
183 }
184
185 #[inline]
186 fn timestamptz_to_arrow(
187 &self,
188 array: &TimestamptzArray,
189 ) -> Result<arrow_array::ArrayRef, ArrayError> {
190 Ok(Arc::new(
191 arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192 ))
193 }
194
195 #[inline]
196 fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197 Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198 }
199
200 #[inline]
201 fn interval_to_arrow(
202 &self,
203 array: &IntervalArray,
204 ) -> Result<arrow_array::ArrayRef, ArrayError> {
205 Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206 array,
207 )))
208 }
209
210 #[inline]
211 fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212 Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213 }
214
215 #[inline]
217 fn decimal_to_arrow(
218 &self,
219 _data_type: &arrow_schema::DataType,
220 array: &DecimalArray,
221 ) -> Result<arrow_array::ArrayRef, ArrayError> {
222 Ok(Arc::new(arrow_array::StringArray::from(array)))
223 }
224
225 #[inline]
227 fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228 Ok(Arc::new(arrow_array::StringArray::from(array)))
229 }
230
231 #[inline]
232 fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233 Ok(Arc::new(arrow_array::Int64Array::from(array)))
234 }
235
236 #[inline]
237 fn list_to_arrow(
238 &self,
239 data_type: &arrow_schema::DataType,
240 array: &ListArray,
241 ) -> Result<arrow_array::ArrayRef, ArrayError> {
242 let arrow_schema::DataType::List(field) = data_type else {
243 return Err(ArrayError::to_arrow("Invalid list type"));
244 };
245 let values = self.to_array(field.data_type(), array.values())?;
246 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248 Ok(Arc::new(arrow_array::ListArray::new(
249 field.clone(),
250 offsets,
251 values,
252 nulls,
253 )))
254 }
255
256 #[inline]
257 fn struct_to_arrow(
258 &self,
259 data_type: &arrow_schema::DataType,
260 array: &StructArray,
261 ) -> Result<arrow_array::ArrayRef, ArrayError> {
262 let arrow_schema::DataType::Struct(fields) = data_type else {
263 return Err(ArrayError::to_arrow("Invalid struct type"));
264 };
265 Ok(Arc::new(arrow_array::StructArray::new(
266 fields.clone(),
267 array
268 .fields()
269 .zip_eq_fast(fields)
270 .map(|(arr, field)| self.to_array(field.data_type(), arr))
271 .try_collect::<_, _, ArrayError>()?,
272 Some(array.null_bitmap().into()),
273 )))
274 }
275
276 #[inline]
277 fn map_to_arrow(
278 &self,
279 data_type: &arrow_schema::DataType,
280 array: &MapArray,
281 ) -> Result<arrow_array::ArrayRef, ArrayError> {
282 let arrow_schema::DataType::Map(field, ordered) = data_type else {
283 return Err(ArrayError::to_arrow("Invalid map type"));
284 };
285 if *ordered {
286 return Err(ArrayError::to_arrow("Sorted map is not supported"));
287 }
288 let values = self
289 .struct_to_arrow(field.data_type(), array.as_struct())?
290 .as_struct()
291 .clone();
292 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
293 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
294 Ok(Arc::new(arrow_array::MapArray::new(
295 field.clone(),
296 offsets,
297 values,
298 nulls,
299 *ordered,
300 )))
301 }
302
303 fn to_arrow_field(
308 &self,
309 name: &str,
310 value: &DataType,
311 ) -> Result<arrow_schema::Field, ArrayError> {
312 let data_type = match value {
313 DataType::Boolean => self.bool_type_to_arrow(),
315 DataType::Int16 => self.int16_type_to_arrow(),
316 DataType::Int32 => self.int32_type_to_arrow(),
317 DataType::Int64 => self.int64_type_to_arrow(),
318 DataType::Int256 => self.int256_type_to_arrow(),
319 DataType::Float32 => self.float32_type_to_arrow(),
320 DataType::Float64 => self.float64_type_to_arrow(),
321 DataType::Date => self.date_type_to_arrow(),
322 DataType::Time => self.time_type_to_arrow(),
323 DataType::Timestamp => self.timestamp_type_to_arrow(),
324 DataType::Timestamptz => self.timestamptz_type_to_arrow(),
325 DataType::Interval => self.interval_type_to_arrow(),
326 DataType::Varchar => self.varchar_type_to_arrow(),
327 DataType::Bytea => self.bytea_type_to_arrow(),
328 DataType::Serial => self.serial_type_to_arrow(),
329 DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
330 DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
331 DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
332 DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
333 DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
334 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
335 };
336 Ok(arrow_schema::Field::new(name, data_type, true))
337 }
338
339 #[inline]
340 fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
341 arrow_schema::DataType::Boolean
342 }
343
344 #[inline]
345 fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
346 arrow_schema::DataType::Int16
347 }
348
349 #[inline]
350 fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
351 arrow_schema::DataType::Int32
352 }
353
354 #[inline]
355 fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
356 arrow_schema::DataType::Int64
357 }
358
359 #[inline]
360 fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
361 arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
362 }
363
364 #[inline]
365 fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
366 arrow_schema::DataType::Float32
367 }
368
369 #[inline]
370 fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
371 arrow_schema::DataType::Float64
372 }
373
374 #[inline]
375 fn date_type_to_arrow(&self) -> arrow_schema::DataType {
376 arrow_schema::DataType::Date32
377 }
378
379 #[inline]
380 fn time_type_to_arrow(&self) -> arrow_schema::DataType {
381 arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
382 }
383
384 #[inline]
385 fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
386 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
387 }
388
389 #[inline]
390 fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
391 arrow_schema::DataType::Timestamp(
392 arrow_schema::TimeUnit::Microsecond,
393 Some("+00:00".into()),
394 )
395 }
396
397 #[inline]
398 fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
399 arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
400 }
401
402 #[inline]
403 fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
404 arrow_schema::DataType::Utf8
405 }
406
407 #[inline]
408 fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
409 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
410 .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
411 }
412
413 #[inline]
414 fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
415 arrow_schema::DataType::Binary
416 }
417
418 #[inline]
419 fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
420 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
421 .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
422 }
423
424 #[inline]
425 fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
426 arrow_schema::DataType::Int64
427 }
428
429 #[inline]
430 fn list_type_to_arrow(
431 &self,
432 elem_type: &DataType,
433 ) -> Result<arrow_schema::DataType, ArrayError> {
434 Ok(arrow_schema::DataType::List(Arc::new(
435 self.to_arrow_field("item", elem_type)?,
436 )))
437 }
438
439 #[inline]
440 fn struct_type_to_arrow(
441 &self,
442 fields: &StructType,
443 ) -> Result<arrow_schema::DataType, ArrayError> {
444 Ok(arrow_schema::DataType::Struct(
445 fields
446 .iter()
447 .map(|(name, ty)| self.to_arrow_field(name, ty))
448 .try_collect::<_, _, ArrayError>()?,
449 ))
450 }
451
452 #[inline]
453 fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
454 let sorted = false;
455 let key = self
457 .to_arrow_field("key", map_type.key())?
458 .with_nullable(false);
459 let value = self.to_arrow_field("value", map_type.value())?;
460 Ok(arrow_schema::DataType::Map(
461 Arc::new(arrow_schema::Field::new(
462 "entries",
463 arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
464 false,
466 )),
467 sorted,
468 ))
469 }
470}
471
472#[allow(clippy::wrong_self_convention)]
474pub trait FromArrow {
475 fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
477 let mut columns = Vec::with_capacity(batch.num_columns());
478 for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
479 let column = Arc::new(self.from_array(field, array)?);
480 columns.push(column);
481 }
482 Ok(DataChunk::new(columns, batch.num_rows()))
483 }
484
485 fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
487 Ok(StructType::new(
488 fields
489 .iter()
490 .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
491 .try_collect::<_, Vec<_>, ArrayError>()?,
492 ))
493 }
494
495 fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
497 use arrow_schema::DataType::*;
498 use arrow_schema::IntervalUnit::*;
499 use arrow_schema::TimeUnit::*;
500
501 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
503 return self.from_extension_type(type_name, field.data_type());
504 }
505
506 Ok(match field.data_type() {
507 Boolean => DataType::Boolean,
508 Int16 => DataType::Int16,
509 Int32 => DataType::Int32,
510 Int64 => DataType::Int64,
511 Int8 => DataType::Int16,
512 UInt8 => DataType::Int16,
513 UInt16 => DataType::Int32,
514 UInt32 => DataType::Int64,
515 UInt64 => DataType::Decimal,
516 Float16 => DataType::Float32,
517 Float32 => DataType::Float32,
518 Float64 => DataType::Float64,
519 Decimal128(_, _) => DataType::Decimal,
520 Decimal256(_, _) => DataType::Int256,
521 Date32 => DataType::Date,
522 Time64(Microsecond) => DataType::Time,
523 Timestamp(Microsecond, None) => DataType::Timestamp,
524 Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
525 Timestamp(Second, None) => DataType::Timestamp,
526 Timestamp(Second, Some(_)) => DataType::Timestamptz,
527 Timestamp(Millisecond, None) => DataType::Timestamp,
528 Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
529 Timestamp(Nanosecond, None) => DataType::Timestamp,
530 Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
531 Interval(MonthDayNano) => DataType::Interval,
532 Utf8 => DataType::Varchar,
533 Binary => DataType::Bytea,
534 LargeUtf8 => self.from_large_utf8()?,
535 LargeBinary => self.from_large_binary()?,
536 List(field) => DataType::List(Box::new(self.from_field(field)?)),
537 Struct(fields) => DataType::Struct(self.from_fields(fields)?),
538 Map(field, _is_sorted) => {
539 let entries = self.from_field(field)?;
540 DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
541 ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
542 })?)
543 }
544 t => {
545 return Err(ArrayError::from_arrow(format!(
546 "unsupported arrow data type: {t:?}"
547 )));
548 }
549 })
550 }
551
552 fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
554 Ok(DataType::Varchar)
555 }
556
557 fn from_large_binary(&self) -> Result<DataType, ArrayError> {
559 Ok(DataType::Bytea)
560 }
561
562 fn from_extension_type(
564 &self,
565 type_name: &str,
566 physical_type: &arrow_schema::DataType,
567 ) -> Result<DataType, ArrayError> {
568 match (type_name, physical_type) {
569 ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
570 ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
571 _ => Err(ArrayError::from_arrow(format!(
572 "unsupported extension type: {type_name:?}"
573 ))),
574 }
575 }
576
577 fn from_array(
579 &self,
580 field: &arrow_schema::Field,
581 array: &arrow_array::ArrayRef,
582 ) -> Result<ArrayImpl, ArrayError> {
583 use arrow_schema::DataType::*;
584 use arrow_schema::IntervalUnit::*;
585 use arrow_schema::TimeUnit::*;
586
587 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
589 return self.from_extension_array(type_name, array);
590 }
591 match array.data_type() {
592 Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
593 Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
594 Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
595 Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
596 Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
597 UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
598 UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
599 UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
600
601 UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
602 Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
603 Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
604 Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
605 Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
606 Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
607 Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
608 Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
609 Timestamp(Second, None) => {
610 self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
611 }
612 Timestamp(Second, Some(_)) => {
613 self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
614 }
615 Timestamp(Millisecond, None) => {
616 self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
617 }
618 Timestamp(Millisecond, Some(_)) => {
619 self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
620 }
621 Timestamp(Microsecond, None) => {
622 self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
623 }
624 Timestamp(Microsecond, Some(_)) => {
625 self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
626 }
627 Timestamp(Nanosecond, None) => {
628 self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
629 }
630 Timestamp(Nanosecond, Some(_)) => {
631 self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
632 }
633 Interval(MonthDayNano) => {
634 self.from_interval_array(array.as_any().downcast_ref().unwrap())
635 }
636 Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
637 Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
638 LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
639 LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
640 List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
641 Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
642 Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
643 t => Err(ArrayError::from_arrow(format!(
644 "unsupported arrow data type: {t:?}",
645 ))),
646 }
647 }
648
649 fn from_extension_array(
651 &self,
652 type_name: &str,
653 array: &arrow_array::ArrayRef,
654 ) -> Result<ArrayImpl, ArrayError> {
655 match type_name {
656 "arrowudf.decimal" => {
657 let array: &arrow_array::StringArray =
658 array.as_any().downcast_ref().ok_or_else(|| {
659 ArrayError::from_arrow(
660 "expected string array for `arrowudf.decimal`".to_owned(),
661 )
662 })?;
663 Ok(ArrayImpl::Decimal(array.try_into()?))
664 }
665 "arrowudf.json" => {
666 let array: &arrow_array::StringArray =
667 array.as_any().downcast_ref().ok_or_else(|| {
668 ArrayError::from_arrow(
669 "expected string array for `arrowudf.json`".to_owned(),
670 )
671 })?;
672 Ok(ArrayImpl::Jsonb(array.try_into()?))
673 }
674 _ => Err(ArrayError::from_arrow(format!(
675 "unsupported extension type: {type_name:?}"
676 ))),
677 }
678 }
679
680 fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
681 Ok(ArrayImpl::Bool(array.into()))
682 }
683
684 fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
685 Ok(ArrayImpl::Int16(array.into()))
686 }
687
688 fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
689 Ok(ArrayImpl::Int16(array.into()))
690 }
691
692 fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
693 Ok(ArrayImpl::Int16(array.into()))
694 }
695
696 fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
697 Ok(ArrayImpl::Int32(array.into()))
698 }
699
700 fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
701 Ok(ArrayImpl::Int64(array.into()))
702 }
703
704 fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
705 Ok(ArrayImpl::Int32(array.into()))
706 }
707
708 fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
709 Ok(ArrayImpl::Int64(array.into()))
710 }
711
712 fn from_int256_array(
713 &self,
714 array: &arrow_array::Decimal256Array,
715 ) -> Result<ArrayImpl, ArrayError> {
716 Ok(ArrayImpl::Int256(array.into()))
717 }
718
719 fn from_decimal128_array(
720 &self,
721 array: &arrow_array::Decimal128Array,
722 ) -> Result<ArrayImpl, ArrayError> {
723 Ok(ArrayImpl::Decimal(array.try_into()?))
724 }
725
726 fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
727 Ok(ArrayImpl::Decimal(array.try_into()?))
728 }
729
730 fn from_float16_array(
731 &self,
732 array: &arrow_array::Float16Array,
733 ) -> Result<ArrayImpl, ArrayError> {
734 Ok(ArrayImpl::Float32(array.try_into()?))
735 }
736
737 fn from_float32_array(
738 &self,
739 array: &arrow_array::Float32Array,
740 ) -> Result<ArrayImpl, ArrayError> {
741 Ok(ArrayImpl::Float32(array.into()))
742 }
743
744 fn from_float64_array(
745 &self,
746 array: &arrow_array::Float64Array,
747 ) -> Result<ArrayImpl, ArrayError> {
748 Ok(ArrayImpl::Float64(array.into()))
749 }
750
751 fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
752 Ok(ArrayImpl::Date(array.into()))
753 }
754
755 fn from_time64us_array(
756 &self,
757 array: &arrow_array::Time64MicrosecondArray,
758 ) -> Result<ArrayImpl, ArrayError> {
759 Ok(ArrayImpl::Time(array.into()))
760 }
761
762 fn from_timestampsecond_array(
763 &self,
764 array: &arrow_array::TimestampSecondArray,
765 ) -> Result<ArrayImpl, ArrayError> {
766 Ok(ArrayImpl::Timestamp(array.into()))
767 }
768 fn from_timestampsecond_some_array(
769 &self,
770 array: &arrow_array::TimestampSecondArray,
771 ) -> Result<ArrayImpl, ArrayError> {
772 Ok(ArrayImpl::Timestamptz(array.into()))
773 }
774
775 fn from_timestampms_array(
776 &self,
777 array: &arrow_array::TimestampMillisecondArray,
778 ) -> Result<ArrayImpl, ArrayError> {
779 Ok(ArrayImpl::Timestamp(array.into()))
780 }
781
782 fn from_timestampms_some_array(
783 &self,
784 array: &arrow_array::TimestampMillisecondArray,
785 ) -> Result<ArrayImpl, ArrayError> {
786 Ok(ArrayImpl::Timestamptz(array.into()))
787 }
788
789 fn from_timestampus_array(
790 &self,
791 array: &arrow_array::TimestampMicrosecondArray,
792 ) -> Result<ArrayImpl, ArrayError> {
793 Ok(ArrayImpl::Timestamp(array.into()))
794 }
795
796 fn from_timestampus_some_array(
797 &self,
798 array: &arrow_array::TimestampMicrosecondArray,
799 ) -> Result<ArrayImpl, ArrayError> {
800 Ok(ArrayImpl::Timestamptz(array.into()))
801 }
802
803 fn from_timestampns_array(
804 &self,
805 array: &arrow_array::TimestampNanosecondArray,
806 ) -> Result<ArrayImpl, ArrayError> {
807 Ok(ArrayImpl::Timestamp(array.into()))
808 }
809
810 fn from_timestampns_some_array(
811 &self,
812 array: &arrow_array::TimestampNanosecondArray,
813 ) -> Result<ArrayImpl, ArrayError> {
814 Ok(ArrayImpl::Timestamptz(array.into()))
815 }
816
817 fn from_interval_array(
818 &self,
819 array: &arrow_array::IntervalMonthDayNanoArray,
820 ) -> Result<ArrayImpl, ArrayError> {
821 Ok(ArrayImpl::Interval(array.into()))
822 }
823
824 fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
825 Ok(ArrayImpl::Utf8(array.into()))
826 }
827
828 fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
829 Ok(ArrayImpl::Bytea(array.into()))
830 }
831
832 fn from_large_utf8_array(
833 &self,
834 array: &arrow_array::LargeStringArray,
835 ) -> Result<ArrayImpl, ArrayError> {
836 Ok(ArrayImpl::Utf8(array.into()))
837 }
838
839 fn from_large_binary_array(
840 &self,
841 array: &arrow_array::LargeBinaryArray,
842 ) -> Result<ArrayImpl, ArrayError> {
843 Ok(ArrayImpl::Bytea(array.into()))
844 }
845
846 fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
847 use arrow_array::Array;
848 let arrow_schema::DataType::List(field) = array.data_type() else {
849 panic!("nested field types cannot be determined.");
850 };
851 Ok(ArrayImpl::List(ListArray {
852 value: Box::new(self.from_array(field, array.values())?),
853 bitmap: match array.nulls() {
854 Some(nulls) => nulls.iter().collect(),
855 None => Bitmap::ones(array.len()),
856 },
857 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
858 }))
859 }
860
861 fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
862 use arrow_array::Array;
863 let arrow_schema::DataType::Struct(fields) = array.data_type() else {
864 panic!("nested field types cannot be determined.");
865 };
866 Ok(ArrayImpl::Struct(StructArray::new(
867 self.from_fields(fields)?,
868 array
869 .columns()
870 .iter()
871 .zip_eq_fast(fields)
872 .map(|(array, field)| self.from_array(field, array).map(Arc::new))
873 .try_collect()?,
874 (0..array.len()).map(|i| array.is_valid(i)).collect(),
875 )))
876 }
877
878 fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
879 use arrow_array::Array;
880 let struct_array = self.from_struct_array(array.entries())?;
881 let list_array = ListArray {
882 value: Box::new(struct_array),
883 bitmap: match array.nulls() {
884 Some(nulls) => nulls.iter().collect(),
885 None => Bitmap::ones(array.len()),
886 },
887 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
888 };
889
890 Ok(ArrayImpl::Map(MapArray { inner: list_array }))
891 }
892}
893
894impl From<&Bitmap> for arrow_buffer::NullBuffer {
895 fn from(bitmap: &Bitmap) -> Self {
896 bitmap.iter().collect()
897 }
898}
899
900macro_rules! converts {
902 ($ArrayType:ty, $ArrowType:ty) => {
903 impl From<&$ArrayType> for $ArrowType {
904 fn from(array: &$ArrayType) -> Self {
905 array.iter().collect()
906 }
907 }
908 impl From<&$ArrowType> for $ArrayType {
909 fn from(array: &$ArrowType) -> Self {
910 array.iter().collect()
911 }
912 }
913 impl From<&[$ArrowType]> for $ArrayType {
914 fn from(arrays: &[$ArrowType]) -> Self {
915 arrays.iter().flat_map(|a| a.iter()).collect()
916 }
917 }
918 };
919 ($ArrayType:ty, $ArrowType:ty, @map) => {
921 impl From<&$ArrayType> for $ArrowType {
922 fn from(array: &$ArrayType) -> Self {
923 array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
924 }
925 }
926 impl From<&$ArrowType> for $ArrayType {
927 fn from(array: &$ArrowType) -> Self {
928 array
929 .iter()
930 .map(|o| {
931 o.map(|v| {
932 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
933 })
934 })
935 .collect()
936 }
937 }
938 impl From<&[$ArrowType]> for $ArrayType {
939 fn from(arrays: &[$ArrowType]) -> Self {
940 arrays
941 .iter()
942 .flat_map(|a| a.iter())
943 .map(|o| {
944 o.map(|v| {
945 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
946 })
947 })
948 .collect()
949 }
950 }
951 };
952}
953
954macro_rules! converts_with_type {
956 ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
957 impl From<&$ArrayType> for $ArrowType {
958 fn from(array: &$ArrayType) -> Self {
959 let values: Vec<Option<$ToType>> =
960 array.iter().map(|x| x.map(|v| v as $ToType)).collect();
961 <$ArrowType>::from_iter(values)
962 }
963 }
964
965 impl From<&$ArrowType> for $ArrayType {
966 fn from(array: &$ArrowType) -> Self {
967 let values: Vec<Option<$FromType>> =
968 array.iter().map(|x| x.map(|v| v as $FromType)).collect();
969 <$ArrayType>::from_iter(values)
970 }
971 }
972
973 impl From<&[$ArrowType]> for $ArrayType {
974 fn from(arrays: &[$ArrowType]) -> Self {
975 let values: Vec<Option<$FromType>> = arrays
976 .iter()
977 .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
978 .collect();
979 <$ArrayType>::from_iter(values)
980 }
981 }
982 };
983}
984
985macro_rules! converts_with_timeunit {
986 ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
987
988 impl From<&$ArrayType> for $ArrowType {
989 fn from(array: &$ArrayType) -> Self {
990 array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
991 }
992 }
993
994 impl From<&$ArrowType> for $ArrayType {
995 fn from(array: &$ArrowType) -> Self {
996 array.iter().map(|o| {
997 o.map(|v| {
998 let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
999 timestamp
1000 })
1001 }).collect()
1002 }
1003 }
1004
1005 impl From<&[$ArrowType]> for $ArrayType {
1006 fn from(arrays: &[$ArrowType]) -> Self {
1007 arrays
1008 .iter()
1009 .flat_map(|a| a.iter())
1010 .map(|o| {
1011 o.map(|v| {
1012 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1013 })
1014 })
1015 .collect()
1016 }
1017 }
1018
1019 };
1020}
1021
1022converts!(BoolArray, arrow_array::BooleanArray);
1023converts!(I16Array, arrow_array::Int16Array);
1024converts!(I32Array, arrow_array::Int32Array);
1025converts!(I64Array, arrow_array::Int64Array);
1026converts!(F32Array, arrow_array::Float32Array, @map);
1027converts!(F64Array, arrow_array::Float64Array, @map);
1028converts!(BytesArray, arrow_array::BinaryArray);
1029converts!(BytesArray, arrow_array::LargeBinaryArray);
1030converts!(Utf8Array, arrow_array::StringArray);
1031converts!(Utf8Array, arrow_array::LargeStringArray);
1032converts!(DateArray, arrow_array::Date32Array, @map);
1033converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1034converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1035converts!(SerialArray, arrow_array::Int64Array, @map);
1036
1037converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1038converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1039converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1040converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1041
1042converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1043converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1044converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1045converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1046
1047converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1048converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1049converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1050converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1051
1052trait FromIntoArrow {
1054 type ArrowType;
1056 fn from_arrow(value: Self::ArrowType) -> Self;
1057 fn into_arrow(self) -> Self::ArrowType;
1058}
1059
1060trait FromIntoArrowWithUnit {
1063 type ArrowType;
1064 type TimestampType;
1066 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1067 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1068}
1069
1070impl FromIntoArrow for Serial {
1071 type ArrowType = i64;
1072
1073 fn from_arrow(value: Self::ArrowType) -> Self {
1074 value.into()
1075 }
1076
1077 fn into_arrow(self) -> Self::ArrowType {
1078 self.into()
1079 }
1080}
1081
1082impl FromIntoArrow for F32 {
1083 type ArrowType = f32;
1084
1085 fn from_arrow(value: Self::ArrowType) -> Self {
1086 value.into()
1087 }
1088
1089 fn into_arrow(self) -> Self::ArrowType {
1090 self.into()
1091 }
1092}
1093
1094impl FromIntoArrow for F64 {
1095 type ArrowType = f64;
1096
1097 fn from_arrow(value: Self::ArrowType) -> Self {
1098 value.into()
1099 }
1100
1101 fn into_arrow(self) -> Self::ArrowType {
1102 self.into()
1103 }
1104}
1105
1106impl FromIntoArrow for Date {
1107 type ArrowType = i32;
1108
1109 fn from_arrow(value: Self::ArrowType) -> Self {
1110 Date(arrow_array::types::Date32Type::to_naive_date(value))
1111 }
1112
1113 fn into_arrow(self) -> Self::ArrowType {
1114 arrow_array::types::Date32Type::from_naive_date(self.0)
1115 }
1116}
1117
1118impl FromIntoArrow for Time {
1119 type ArrowType = i64;
1120
1121 fn from_arrow(value: Self::ArrowType) -> Self {
1122 Time(
1123 NaiveTime::from_num_seconds_from_midnight_opt(
1124 (value / 1_000_000) as _,
1125 (value % 1_000_000 * 1000) as _,
1126 )
1127 .unwrap(),
1128 )
1129 }
1130
1131 fn into_arrow(self) -> Self::ArrowType {
1132 self.0
1133 .signed_duration_since(NaiveTime::default())
1134 .num_microseconds()
1135 .unwrap()
1136 }
1137}
1138
1139impl FromIntoArrowWithUnit for Timestamp {
1140 type ArrowType = i64;
1141 type TimestampType = TimeUnit;
1142
1143 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1144 match time_unit {
1145 TimeUnit::Second => {
1146 Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1147 }
1148 TimeUnit::Millisecond => {
1149 Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1150 }
1151 TimeUnit::Microsecond => {
1152 Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1153 }
1154 TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1155 }
1156 }
1157
1158 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1159 match time_unit {
1160 TimeUnit::Second => self.0.and_utc().timestamp(),
1161 TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1162 TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1163 TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1164 }
1165 }
1166}
1167
1168impl FromIntoArrowWithUnit for Timestamptz {
1169 type ArrowType = i64;
1170 type TimestampType = TimeUnit;
1171
1172 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1173 match time_unit {
1174 TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1175 TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1176 TimeUnit::Microsecond => Timestamptz::from_micros(value),
1177 TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1178 }
1179 }
1180
1181 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1182 match time_unit {
1183 TimeUnit::Second => self.timestamp(),
1184 TimeUnit::Millisecond => self.timestamp_millis(),
1185 TimeUnit::Microsecond => self.timestamp_micros(),
1186 TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1187 }
1188 }
1189}
1190
1191impl FromIntoArrow for Interval {
1192 type ArrowType = ArrowIntervalType;
1193
1194 fn from_arrow(value: Self::ArrowType) -> Self {
1195 Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1196 }
1197
1198 fn into_arrow(self) -> Self::ArrowType {
1199 ArrowIntervalType {
1200 months: self.months(),
1201 days: self.days(),
1202 nanoseconds: self.usecs() * 1000,
1204 }
1205 }
1206}
1207
1208impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1209 fn from(array: &DecimalArray) -> Self {
1210 let mut builder =
1211 arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1212 for value in array.iter() {
1213 builder.append_option(value.map(|d| d.to_string()));
1214 }
1215 builder.finish()
1216 }
1217}
1218
1219impl From<&DecimalArray> for arrow_array::StringArray {
1220 fn from(array: &DecimalArray) -> Self {
1221 let mut builder =
1222 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1223 for value in array.iter() {
1224 builder.append_option(value.map(|d| d.to_string()));
1225 }
1226 builder.finish()
1227 }
1228}
1229
1230impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1232 type Error = ArrayError;
1233
1234 fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1235 if array.scale() < 0 {
1236 bail!("support negative scale for arrow decimal")
1237 }
1238 let from_arrow = |value| {
1239 const NAN: i128 = i128::MIN + 1;
1240 let res = match value {
1241 NAN => Decimal::NaN,
1242 i128::MAX => Decimal::PositiveInf,
1243 i128::MIN => Decimal::NegativeInf,
1244 _ => Decimal::Normalized(
1245 rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1246 .map_err(ArrayError::internal)?,
1247 ),
1248 };
1249 Ok(res)
1250 };
1251 array
1252 .iter()
1253 .map(|o| o.map(from_arrow).transpose())
1254 .collect::<Result<Self, Self::Error>>()
1255 }
1256}
1257
1258impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1260 type Error = ArrayError;
1261
1262 fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1263 let from_arrow = |value| {
1264 let res = Decimal::from(value);
1266 Ok(res)
1267 };
1268
1269 array
1271 .iter()
1272 .map(|o| o.map(from_arrow).transpose())
1273 .collect::<Result<Self, Self::Error>>()
1274 }
1275}
1276
1277impl TryFrom<&arrow_array::Float16Array> for F32Array {
1278 type Error = ArrayError;
1279
1280 fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1281 let from_arrow = |value| Ok(f32::from(value));
1282
1283 array
1284 .iter()
1285 .map(|o| o.map(from_arrow).transpose())
1286 .collect::<Result<Self, Self::Error>>()
1287 }
1288}
1289
1290impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1291 type Error = ArrayError;
1292
1293 fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1294 array
1295 .iter()
1296 .map(|o| {
1297 o.map(|s| {
1298 let s = std::str::from_utf8(s)
1299 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1300 s.parse()
1301 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1302 })
1303 .transpose()
1304 })
1305 .try_collect()
1306 }
1307}
1308
1309impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1310 type Error = ArrayError;
1311
1312 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1313 array
1314 .iter()
1315 .map(|o| {
1316 o.map(|s| {
1317 s.parse()
1318 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1319 })
1320 .transpose()
1321 })
1322 .try_collect()
1323 }
1324}
1325
1326impl From<&JsonbArray> for arrow_array::StringArray {
1327 fn from(array: &JsonbArray) -> Self {
1328 let mut builder =
1329 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1330 for value in array.iter() {
1331 match value {
1332 Some(jsonb) => {
1333 write!(&mut builder, "{}", jsonb).unwrap();
1334 builder.append_value("");
1335 }
1336 None => builder.append_null(),
1337 }
1338 }
1339 builder.finish()
1340 }
1341}
1342
1343impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1344 type Error = ArrayError;
1345
1346 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1347 array
1348 .iter()
1349 .map(|o| {
1350 o.map(|s| {
1351 s.parse()
1352 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1353 })
1354 .transpose()
1355 })
1356 .try_collect()
1357 }
1358}
1359
1360impl From<&IntervalArray> for arrow_array::StringArray {
1361 fn from(array: &IntervalArray) -> Self {
1362 let mut builder =
1363 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1364 for value in array.iter() {
1365 match value {
1366 Some(interval) => {
1367 write!(&mut builder, "{}", interval).unwrap();
1368 builder.append_value("");
1369 }
1370 None => builder.append_null(),
1371 }
1372 }
1373 builder.finish()
1374 }
1375}
1376
1377impl From<&JsonbArray> for arrow_array::LargeStringArray {
1378 fn from(array: &JsonbArray) -> Self {
1379 let mut builder =
1380 arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1381 for value in array.iter() {
1382 match value {
1383 Some(jsonb) => {
1384 write!(&mut builder, "{}", jsonb).unwrap();
1385 builder.append_value("");
1386 }
1387 None => builder.append_null(),
1388 }
1389 }
1390 builder.finish()
1391 }
1392}
1393
1394impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1395 type Error = ArrayError;
1396
1397 fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1398 array
1399 .iter()
1400 .map(|o| {
1401 o.map(|s| {
1402 s.parse()
1403 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1404 })
1405 .transpose()
1406 })
1407 .try_collect()
1408 }
1409}
1410
1411impl From<arrow_buffer::i256> for Int256 {
1412 fn from(value: arrow_buffer::i256) -> Self {
1413 let buffer = value.to_be_bytes();
1414 Int256::from_be_bytes(buffer)
1415 }
1416}
1417
1418impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1419 fn from(val: Int256Ref<'a>) -> Self {
1420 let buffer = val.to_be_bytes();
1421 arrow_buffer::i256::from_be_bytes(buffer)
1422 }
1423}
1424
1425impl From<&Int256Array> for arrow_array::Decimal256Array {
1426 fn from(array: &Int256Array) -> Self {
1427 array
1428 .iter()
1429 .map(|o| o.map(arrow_buffer::i256::from))
1430 .collect()
1431 }
1432}
1433
1434impl From<&arrow_array::Decimal256Array> for Int256Array {
1435 fn from(array: &arrow_array::Decimal256Array) -> Self {
1436 let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1437
1438 values
1439 .iter()
1440 .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1441 .collect()
1442 }
1443}
1444
1445pub fn is_parquet_schema_match_source_schema(
1461 arrow_data_type: &arrow_schema::DataType,
1462 rw_data_type: &crate::types::DataType,
1463) -> bool {
1464 use arrow_schema::DataType as ArrowType;
1465
1466 use crate::types::{DataType as RwType, MapType, StructType};
1467
1468 match (arrow_data_type, rw_data_type) {
1469 (ArrowType::Boolean, RwType::Boolean)
1471 | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1472 | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1473 | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1474 | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1475 | (ArrowType::Decimal256(_, _), RwType::Int256)
1476 | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1477 | (ArrowType::Float64, RwType::Float64)
1478 | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1479 | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1480 | (ArrowType::Date32, RwType::Date)
1481 | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1482 | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1483 | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1484 | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1485
1486 (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1489 if arrow_fields.len() != rw_struct.len() {
1490 return false;
1491 }
1492 for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1493 {
1494 if arrow_field.name() != rw_name {
1495 return false;
1496 }
1497 if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1498 return false;
1499 }
1500 }
1501 true
1502 }
1503 (ArrowType::List(arrow_field), RwType::List(rw_elem_ty)) => {
1506 is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_elem_ty)
1507 }
1508 (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1512 if let ArrowType::Struct(fields) = arrow_field.data_type() {
1513 if fields.len() != 2 {
1514 return false;
1515 }
1516 let key_field = &fields[0];
1517 let value_field = &fields[1];
1518 if key_field.name() != "key" || value_field.name() != "value" {
1519 return false;
1520 }
1521 let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1522 is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1523 && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1524 } else {
1525 false
1526 }
1527 }
1528 _ => false,
1530 }
1531}
1532#[cfg(test)]
1533mod tests {
1534
1535 use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1536
1537 use super::*;
1538 use crate::types::{DataType as RwType, MapType, StructType};
1539
1540 #[test]
1541 fn test_struct_schema_match() {
1542 let arrow_struct = ArrowType::Struct(
1545 vec![
1546 ArrowField::new("f1", ArrowType::Float64, true),
1547 ArrowField::new("f2", ArrowType::Utf8, true),
1548 ]
1549 .into(),
1550 );
1551 let rw_struct = RwType::Struct(StructType::new(vec![
1553 ("f1".to_owned(), RwType::Float64),
1554 ("f2".to_owned(), RwType::Varchar),
1555 ]));
1556 assert!(is_parquet_schema_match_source_schema(
1557 &arrow_struct,
1558 &rw_struct
1559 ));
1560
1561 let arrow_struct2 = ArrowType::Struct(
1563 vec![
1564 ArrowField::new("f1", ArrowType::Float64, true),
1565 ArrowField::new("f3", ArrowType::Utf8, true),
1566 ]
1567 .into(),
1568 );
1569 assert!(!is_parquet_schema_match_source_schema(
1570 &arrow_struct2,
1571 &rw_struct
1572 ));
1573 }
1574
1575 #[test]
1576 fn test_list_schema_match() {
1577 let arrow_list =
1579 ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1580 let rw_list = RwType::List(Box::new(RwType::Float64));
1582 assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1583
1584 let rw_list2 = RwType::List(Box::new(RwType::Int32));
1585 assert!(!is_parquet_schema_match_source_schema(
1586 &arrow_list,
1587 &rw_list2
1588 ));
1589 }
1590
1591 #[test]
1592 fn test_map_schema_match() {
1593 let arrow_map = ArrowType::Map(
1595 Arc::new(ArrowField::new(
1596 "entries",
1597 ArrowType::Struct(
1598 vec![
1599 ArrowField::new("key", ArrowType::Utf8, false),
1600 ArrowField::new("value", ArrowType::Int32, true),
1601 ]
1602 .into(),
1603 ),
1604 false,
1605 )),
1606 false,
1607 );
1608 let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1610 assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1611
1612 let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1614 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1615
1616 let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1618 assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1619
1620 let arrow_map2 = ArrowType::Map(
1622 Arc::new(ArrowField::new(
1623 "entries",
1624 ArrowType::Struct(
1625 vec![
1626 ArrowField::new("k", ArrowType::Utf8, false),
1627 ArrowField::new("value", ArrowType::Int32, true),
1628 ]
1629 .into(),
1630 ),
1631 false,
1632 )),
1633 false,
1634 );
1635 assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1636 }
1637
1638 #[test]
1639 fn bool() {
1640 let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1641 let arrow = arrow_array::BooleanArray::from(&array);
1642 assert_eq!(BoolArray::from(&arrow), array);
1643 }
1644
1645 #[test]
1646 fn i16() {
1647 let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1648 let arrow = arrow_array::Int16Array::from(&array);
1649 assert_eq!(I16Array::from(&arrow), array);
1650 }
1651
1652 #[test]
1653 fn i32() {
1654 let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1655 let arrow = arrow_array::Int32Array::from(&array);
1656 assert_eq!(I32Array::from(&arrow), array);
1657 }
1658
1659 #[test]
1660 fn i64() {
1661 let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1662 let arrow = arrow_array::Int64Array::from(&array);
1663 assert_eq!(I64Array::from(&arrow), array);
1664 }
1665
1666 #[test]
1667 fn f32() {
1668 let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1669 let arrow = arrow_array::Float32Array::from(&array);
1670 assert_eq!(F32Array::from(&arrow), array);
1671 }
1672
1673 #[test]
1674 fn f64() {
1675 let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1676 let arrow = arrow_array::Float64Array::from(&array);
1677 assert_eq!(F64Array::from(&arrow), array);
1678 }
1679
1680 #[test]
1681 fn int8() {
1682 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1683 let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1684 let converted: PrimitiveArray<i16> = (&arr).into();
1685 assert_eq!(converted, array);
1686 }
1687
1688 #[test]
1689 fn uint8() {
1690 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1691 let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1692 let converted: PrimitiveArray<i16> = (&arr).into();
1693 assert_eq!(converted, array);
1694 }
1695
1696 #[test]
1697 fn uint16() {
1698 let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1699 let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1700 let converted: PrimitiveArray<i32> = (&arr).into();
1701 assert_eq!(converted, array);
1702 }
1703
1704 #[test]
1705 fn uint32() {
1706 let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1707 let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1708 let converted: PrimitiveArray<i64> = (&arr).into();
1709 assert_eq!(converted, array);
1710 }
1711
1712 #[test]
1713 fn uint64() {
1714 let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1715 None,
1716 Some(Decimal::Normalized("7".parse().unwrap())),
1717 Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1718 ]);
1719 let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1720 let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1721 assert_eq!(converted, array);
1722 }
1723
1724 #[test]
1725 fn date() {
1726 let array = DateArray::from_iter([
1727 None,
1728 Date::with_days_since_ce(12345).ok(),
1729 Date::with_days_since_ce(-12345).ok(),
1730 ]);
1731 let arrow = arrow_array::Date32Array::from(&array);
1732 assert_eq!(DateArray::from(&arrow), array);
1733 }
1734
1735 #[test]
1736 fn time() {
1737 let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1738 let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1739 assert_eq!(TimeArray::from(&arrow), array);
1740 }
1741
1742 #[test]
1743 fn timestamp() {
1744 let array =
1745 TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1746 let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1747 assert_eq!(TimestampArray::from(&arrow), array);
1748 }
1749
1750 #[test]
1751 fn interval() {
1752 let array = IntervalArray::from_iter([
1753 None,
1754 Some(Interval::from_month_day_usec(
1755 1_000_000,
1756 1_000,
1757 1_000_000_000,
1758 )),
1759 Some(Interval::from_month_day_usec(
1760 -1_000_000,
1761 -1_000,
1762 -1_000_000_000,
1763 )),
1764 ]);
1765 let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1766 assert_eq!(IntervalArray::from(&arrow), array);
1767 }
1768
1769 #[test]
1770 fn string() {
1771 let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1772 let arrow = arrow_array::StringArray::from(&array);
1773 assert_eq!(Utf8Array::from(&arrow), array);
1774 }
1775
1776 #[test]
1777 fn binary() {
1778 let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1779 let arrow = arrow_array::BinaryArray::from(&array);
1780 assert_eq!(BytesArray::from(&arrow), array);
1781 }
1782
1783 #[test]
1784 fn decimal() {
1785 let array = DecimalArray::from_iter([
1786 None,
1787 Some(Decimal::NaN),
1788 Some(Decimal::PositiveInf),
1789 Some(Decimal::NegativeInf),
1790 Some(Decimal::Normalized("123.4".parse().unwrap())),
1791 Some(Decimal::Normalized("123.456".parse().unwrap())),
1792 ]);
1793 let arrow = arrow_array::LargeBinaryArray::from(&array);
1794 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1795
1796 let arrow = arrow_array::StringArray::from(&array);
1797 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1798 }
1799
1800 #[test]
1801 fn jsonb() {
1802 let array = JsonbArray::from_iter([
1803 None,
1804 Some("null".parse().unwrap()),
1805 Some("false".parse().unwrap()),
1806 Some("1".parse().unwrap()),
1807 Some("[1, 2, 3]".parse().unwrap()),
1808 Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1809 ]);
1810 let arrow = arrow_array::LargeStringArray::from(&array);
1811 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1812
1813 let arrow = arrow_array::StringArray::from(&array);
1814 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1815 }
1816
1817 #[test]
1818 fn int256() {
1819 let values = [
1820 None,
1821 Some(Int256::from(1)),
1822 Some(Int256::from(i64::MAX)),
1823 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1824 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1825 Some(
1826 Int256::from(i64::MAX)
1827 * Int256::from(i64::MAX)
1828 * Int256::from(i64::MAX)
1829 * Int256::from(i64::MAX),
1830 ),
1831 Some(Int256::min_value()),
1832 Some(Int256::max_value()),
1833 ];
1834
1835 let array =
1836 Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1837 let arrow = arrow_array::Decimal256Array::from(&array);
1838 assert_eq!(Int256Array::from(&arrow), array);
1839 }
1840}