1#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55use crate::array::*;
57use crate::types::{DataType as RwDataType, *};
58use crate::util::iter_util::ZipEqFast;
59
60pub trait ToArrow {
65 fn to_record_batch(
69 &self,
70 schema: arrow_schema::SchemaRef,
71 chunk: &DataChunk,
72 ) -> Result<arrow_array::RecordBatch, ArrayError> {
73 if !chunk.is_compacted() {
75 let c = chunk.clone();
76 return self.to_record_batch(schema, &c.compact());
77 }
78
79 let columns: Vec<_> = chunk
81 .columns()
82 .iter()
83 .zip_eq_fast(schema.fields().iter())
84 .map(|(column, field)| self.to_array(field.data_type(), column))
85 .try_collect()?;
86
87 let opts =
89 arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90 arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91 .map_err(ArrayError::to_arrow)
92 }
93
94 fn to_array(
96 &self,
97 data_type: &arrow_schema::DataType,
98 array: &ArrayImpl,
99 ) -> Result<arrow_array::ArrayRef, ArrayError> {
100 let arrow_array = match array {
101 ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102 ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103 ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104 ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105 ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106 ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107 ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108 ArrayImpl::Date(array) => self.date_to_arrow(array),
109 ArrayImpl::Time(array) => self.time_to_arrow(array),
110 ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111 ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112 ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113 ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114 ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115 ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116 ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117 ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118 ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119 ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120 ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121 }?;
122 if arrow_array.data_type() != data_type {
123 arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
124 } else {
125 Ok(arrow_array)
126 }
127 }
128
129 #[inline]
130 fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
131 Ok(Arc::new(arrow_array::BooleanArray::from(array)))
132 }
133
134 #[inline]
135 fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
136 Ok(Arc::new(arrow_array::Int16Array::from(array)))
137 }
138
139 #[inline]
140 fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
141 Ok(Arc::new(arrow_array::Int32Array::from(array)))
142 }
143
144 #[inline]
145 fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
146 Ok(Arc::new(arrow_array::Int64Array::from(array)))
147 }
148
149 #[inline]
150 fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
151 Ok(Arc::new(arrow_array::Float32Array::from(array)))
152 }
153
154 #[inline]
155 fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
156 Ok(Arc::new(arrow_array::Float64Array::from(array)))
157 }
158
159 #[inline]
160 fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
161 Ok(Arc::new(arrow_array::StringArray::from(array)))
162 }
163
164 #[inline]
165 fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
166 Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
167 }
168
169 #[inline]
170 fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
171 Ok(Arc::new(arrow_array::Date32Array::from(array)))
172 }
173
174 #[inline]
175 fn timestamp_to_arrow(
176 &self,
177 array: &TimestampArray,
178 ) -> Result<arrow_array::ArrayRef, ArrayError> {
179 Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
180 array,
181 )))
182 }
183
184 #[inline]
185 fn timestamptz_to_arrow(
186 &self,
187 array: &TimestamptzArray,
188 ) -> Result<arrow_array::ArrayRef, ArrayError> {
189 Ok(Arc::new(
190 arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
191 ))
192 }
193
194 #[inline]
195 fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
196 Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
197 }
198
199 #[inline]
200 fn interval_to_arrow(
201 &self,
202 array: &IntervalArray,
203 ) -> Result<arrow_array::ArrayRef, ArrayError> {
204 Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
205 array,
206 )))
207 }
208
209 #[inline]
210 fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
211 Ok(Arc::new(arrow_array::BinaryArray::from(array)))
212 }
213
214 #[inline]
216 fn decimal_to_arrow(
217 &self,
218 _data_type: &arrow_schema::DataType,
219 array: &DecimalArray,
220 ) -> Result<arrow_array::ArrayRef, ArrayError> {
221 Ok(Arc::new(arrow_array::StringArray::from(array)))
222 }
223
224 #[inline]
226 fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
227 Ok(Arc::new(arrow_array::StringArray::from(array)))
228 }
229
230 #[inline]
231 fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
232 Ok(Arc::new(arrow_array::Int64Array::from(array)))
233 }
234
235 #[inline]
236 fn list_to_arrow(
237 &self,
238 data_type: &arrow_schema::DataType,
239 array: &ListArray,
240 ) -> Result<arrow_array::ArrayRef, ArrayError> {
241 let arrow_schema::DataType::List(field) = data_type else {
242 return Err(ArrayError::to_arrow("Invalid list type"));
243 };
244 let values = self.to_array(field.data_type(), array.values())?;
245 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
246 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
247 Ok(Arc::new(arrow_array::ListArray::new(
248 field.clone(),
249 offsets,
250 values,
251 nulls,
252 )))
253 }
254
255 #[inline]
256 fn struct_to_arrow(
257 &self,
258 data_type: &arrow_schema::DataType,
259 array: &StructArray,
260 ) -> Result<arrow_array::ArrayRef, ArrayError> {
261 let arrow_schema::DataType::Struct(fields) = data_type else {
262 return Err(ArrayError::to_arrow("Invalid struct type"));
263 };
264 Ok(Arc::new(arrow_array::StructArray::new(
265 fields.clone(),
266 array
267 .fields()
268 .zip_eq_fast(fields)
269 .map(|(arr, field)| self.to_array(field.data_type(), arr))
270 .try_collect::<_, _, ArrayError>()?,
271 Some(array.null_bitmap().into()),
272 )))
273 }
274
275 #[inline]
276 fn map_to_arrow(
277 &self,
278 data_type: &arrow_schema::DataType,
279 array: &MapArray,
280 ) -> Result<arrow_array::ArrayRef, ArrayError> {
281 let arrow_schema::DataType::Map(field, ordered) = data_type else {
282 return Err(ArrayError::to_arrow("Invalid map type"));
283 };
284 if *ordered {
285 return Err(ArrayError::to_arrow("Sorted map is not supported"));
286 }
287 let values = self
288 .struct_to_arrow(field.data_type(), array.as_struct())?
289 .as_struct()
290 .clone();
291 let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
292 let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
293 Ok(Arc::new(arrow_array::MapArray::new(
294 field.clone(),
295 offsets,
296 values,
297 nulls,
298 *ordered,
299 )))
300 }
301
302 fn to_arrow_field(
307 &self,
308 name: &str,
309 value: &DataType,
310 ) -> Result<arrow_schema::Field, ArrayError> {
311 let data_type = match value {
312 DataType::Boolean => self.bool_type_to_arrow(),
314 DataType::Int16 => self.int16_type_to_arrow(),
315 DataType::Int32 => self.int32_type_to_arrow(),
316 DataType::Int64 => self.int64_type_to_arrow(),
317 DataType::Int256 => self.int256_type_to_arrow(),
318 DataType::Float32 => self.float32_type_to_arrow(),
319 DataType::Float64 => self.float64_type_to_arrow(),
320 DataType::Date => self.date_type_to_arrow(),
321 DataType::Time => self.time_type_to_arrow(),
322 DataType::Timestamp => self.timestamp_type_to_arrow(),
323 DataType::Timestamptz => self.timestamptz_type_to_arrow(),
324 DataType::Interval => self.interval_type_to_arrow(),
325 DataType::Varchar => self.varchar_type_to_arrow(),
326 DataType::Bytea => self.bytea_type_to_arrow(),
327 DataType::Serial => self.serial_type_to_arrow(),
328 DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
329 DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
330 DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
331 DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
332 DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
333 };
334 Ok(arrow_schema::Field::new(name, data_type, true))
335 }
336
337 #[inline]
338 fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
339 arrow_schema::DataType::Boolean
340 }
341
342 #[inline]
343 fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
344 arrow_schema::DataType::Int16
345 }
346
347 #[inline]
348 fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
349 arrow_schema::DataType::Int32
350 }
351
352 #[inline]
353 fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
354 arrow_schema::DataType::Int64
355 }
356
357 #[inline]
358 fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
359 arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
360 }
361
362 #[inline]
363 fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
364 arrow_schema::DataType::Float32
365 }
366
367 #[inline]
368 fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
369 arrow_schema::DataType::Float64
370 }
371
372 #[inline]
373 fn date_type_to_arrow(&self) -> arrow_schema::DataType {
374 arrow_schema::DataType::Date32
375 }
376
377 #[inline]
378 fn time_type_to_arrow(&self) -> arrow_schema::DataType {
379 arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
380 }
381
382 #[inline]
383 fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
384 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
385 }
386
387 #[inline]
388 fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
389 arrow_schema::DataType::Timestamp(
390 arrow_schema::TimeUnit::Microsecond,
391 Some("+00:00".into()),
392 )
393 }
394
395 #[inline]
396 fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
397 arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
398 }
399
400 #[inline]
401 fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
402 arrow_schema::DataType::Utf8
403 }
404
405 #[inline]
406 fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
407 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
408 .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
409 }
410
411 #[inline]
412 fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
413 arrow_schema::DataType::Binary
414 }
415
416 #[inline]
417 fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
418 arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
419 .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
420 }
421
422 #[inline]
423 fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
424 arrow_schema::DataType::Int64
425 }
426
427 #[inline]
428 fn list_type_to_arrow(
429 &self,
430 elem_type: &DataType,
431 ) -> Result<arrow_schema::DataType, ArrayError> {
432 Ok(arrow_schema::DataType::List(Arc::new(
433 self.to_arrow_field("item", elem_type)?,
434 )))
435 }
436
437 #[inline]
438 fn struct_type_to_arrow(
439 &self,
440 fields: &StructType,
441 ) -> Result<arrow_schema::DataType, ArrayError> {
442 Ok(arrow_schema::DataType::Struct(
443 fields
444 .iter()
445 .map(|(name, ty)| self.to_arrow_field(name, ty))
446 .try_collect::<_, _, ArrayError>()?,
447 ))
448 }
449
450 #[inline]
451 fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
452 let sorted = false;
453 let key = self
455 .to_arrow_field("key", map_type.key())?
456 .with_nullable(false);
457 let value = self.to_arrow_field("value", map_type.value())?;
458 Ok(arrow_schema::DataType::Map(
459 Arc::new(arrow_schema::Field::new(
460 "entries",
461 arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
462 false,
464 )),
465 sorted,
466 ))
467 }
468}
469
470#[allow(clippy::wrong_self_convention)]
472pub trait FromArrow {
473 fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
475 let mut columns = Vec::with_capacity(batch.num_columns());
476 for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
477 let column = Arc::new(self.from_array(field, array)?);
478 columns.push(column);
479 }
480 Ok(DataChunk::new(columns, batch.num_rows()))
481 }
482
483 fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
485 Ok(StructType::new(
486 fields
487 .iter()
488 .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
489 .try_collect::<_, Vec<_>, ArrayError>()?,
490 ))
491 }
492
493 fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
495 use arrow_schema::DataType::*;
496 use arrow_schema::IntervalUnit::*;
497 use arrow_schema::TimeUnit::*;
498
499 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
501 return self.from_extension_type(type_name, field.data_type());
502 }
503
504 Ok(match field.data_type() {
505 Boolean => DataType::Boolean,
506 Int16 => DataType::Int16,
507 Int32 => DataType::Int32,
508 Int64 => DataType::Int64,
509 Int8 => DataType::Int16,
510 UInt8 => DataType::Int16,
511 UInt16 => DataType::Int32,
512 UInt32 => DataType::Int64,
513 UInt64 => DataType::Decimal,
514 Float16 => DataType::Float32,
515 Float32 => DataType::Float32,
516 Float64 => DataType::Float64,
517 Decimal128(_, _) => DataType::Decimal,
518 Decimal256(_, _) => DataType::Int256,
519 Date32 => DataType::Date,
520 Time64(Microsecond) => DataType::Time,
521 Timestamp(Microsecond, None) => DataType::Timestamp,
522 Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
523 Timestamp(Second, None) => DataType::Timestamp,
524 Timestamp(Second, Some(_)) => DataType::Timestamptz,
525 Timestamp(Millisecond, None) => DataType::Timestamp,
526 Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
527 Timestamp(Nanosecond, None) => DataType::Timestamp,
528 Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
529 Interval(MonthDayNano) => DataType::Interval,
530 Utf8 => DataType::Varchar,
531 Binary => DataType::Bytea,
532 LargeUtf8 => self.from_large_utf8()?,
533 LargeBinary => self.from_large_binary()?,
534 List(field) => DataType::List(Box::new(self.from_field(field)?)),
535 Struct(fields) => DataType::Struct(self.from_fields(fields)?),
536 Map(field, _is_sorted) => {
537 let entries = self.from_field(field)?;
538 DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
539 ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
540 })?)
541 }
542 t => {
543 return Err(ArrayError::from_arrow(format!(
544 "unsupported arrow data type: {t:?}"
545 )));
546 }
547 })
548 }
549
550 fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
552 Ok(DataType::Varchar)
553 }
554
555 fn from_large_binary(&self) -> Result<DataType, ArrayError> {
557 Ok(DataType::Bytea)
558 }
559
560 fn from_extension_type(
562 &self,
563 type_name: &str,
564 physical_type: &arrow_schema::DataType,
565 ) -> Result<DataType, ArrayError> {
566 match (type_name, physical_type) {
567 ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
568 ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
569 _ => Err(ArrayError::from_arrow(format!(
570 "unsupported extension type: {type_name:?}"
571 ))),
572 }
573 }
574
575 fn from_array(
577 &self,
578 field: &arrow_schema::Field,
579 array: &arrow_array::ArrayRef,
580 ) -> Result<ArrayImpl, ArrayError> {
581 use arrow_schema::DataType::*;
582 use arrow_schema::IntervalUnit::*;
583 use arrow_schema::TimeUnit::*;
584
585 if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
587 return self.from_extension_array(type_name, array);
588 }
589 match array.data_type() {
590 Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
591 Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
592 Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
593 Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
594 Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
595 UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
596 UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
597 UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
598
599 UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
600 Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
601 Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
602 Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
603 Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
604 Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
605 Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
606 Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
607 Timestamp(Second, None) => {
608 self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
609 }
610 Timestamp(Second, Some(_)) => {
611 self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
612 }
613 Timestamp(Millisecond, None) => {
614 self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
615 }
616 Timestamp(Millisecond, Some(_)) => {
617 self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
618 }
619 Timestamp(Microsecond, None) => {
620 self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
621 }
622 Timestamp(Microsecond, Some(_)) => {
623 self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
624 }
625 Timestamp(Nanosecond, None) => {
626 self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
627 }
628 Timestamp(Nanosecond, Some(_)) => {
629 self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
630 }
631 Interval(MonthDayNano) => {
632 self.from_interval_array(array.as_any().downcast_ref().unwrap())
633 }
634 Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
635 Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
636 LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
637 LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
638 List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
639 Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
640 Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
641 t => Err(ArrayError::from_arrow(format!(
642 "unsupported arrow data type: {t:?}",
643 ))),
644 }
645 }
646
647 fn from_extension_array(
649 &self,
650 type_name: &str,
651 array: &arrow_array::ArrayRef,
652 ) -> Result<ArrayImpl, ArrayError> {
653 match type_name {
654 "arrowudf.decimal" => {
655 let array: &arrow_array::StringArray =
656 array.as_any().downcast_ref().ok_or_else(|| {
657 ArrayError::from_arrow(
658 "expected string array for `arrowudf.decimal`".to_owned(),
659 )
660 })?;
661 Ok(ArrayImpl::Decimal(array.try_into()?))
662 }
663 "arrowudf.json" => {
664 let array: &arrow_array::StringArray =
665 array.as_any().downcast_ref().ok_or_else(|| {
666 ArrayError::from_arrow(
667 "expected string array for `arrowudf.json`".to_owned(),
668 )
669 })?;
670 Ok(ArrayImpl::Jsonb(array.try_into()?))
671 }
672 _ => Err(ArrayError::from_arrow(format!(
673 "unsupported extension type: {type_name:?}"
674 ))),
675 }
676 }
677
678 fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
679 Ok(ArrayImpl::Bool(array.into()))
680 }
681
682 fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
683 Ok(ArrayImpl::Int16(array.into()))
684 }
685
686 fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
687 Ok(ArrayImpl::Int16(array.into()))
688 }
689
690 fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
691 Ok(ArrayImpl::Int16(array.into()))
692 }
693
694 fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
695 Ok(ArrayImpl::Int32(array.into()))
696 }
697
698 fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
699 Ok(ArrayImpl::Int64(array.into()))
700 }
701
702 fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
703 Ok(ArrayImpl::Int32(array.into()))
704 }
705
706 fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
707 Ok(ArrayImpl::Int64(array.into()))
708 }
709
710 fn from_int256_array(
711 &self,
712 array: &arrow_array::Decimal256Array,
713 ) -> Result<ArrayImpl, ArrayError> {
714 Ok(ArrayImpl::Int256(array.into()))
715 }
716
717 fn from_decimal128_array(
718 &self,
719 array: &arrow_array::Decimal128Array,
720 ) -> Result<ArrayImpl, ArrayError> {
721 Ok(ArrayImpl::Decimal(array.try_into()?))
722 }
723
724 fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
725 Ok(ArrayImpl::Decimal(array.try_into()?))
726 }
727
728 fn from_float16_array(
729 &self,
730 array: &arrow_array::Float16Array,
731 ) -> Result<ArrayImpl, ArrayError> {
732 Ok(ArrayImpl::Float32(array.try_into()?))
733 }
734
735 fn from_float32_array(
736 &self,
737 array: &arrow_array::Float32Array,
738 ) -> Result<ArrayImpl, ArrayError> {
739 Ok(ArrayImpl::Float32(array.into()))
740 }
741
742 fn from_float64_array(
743 &self,
744 array: &arrow_array::Float64Array,
745 ) -> Result<ArrayImpl, ArrayError> {
746 Ok(ArrayImpl::Float64(array.into()))
747 }
748
749 fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
750 Ok(ArrayImpl::Date(array.into()))
751 }
752
753 fn from_time64us_array(
754 &self,
755 array: &arrow_array::Time64MicrosecondArray,
756 ) -> Result<ArrayImpl, ArrayError> {
757 Ok(ArrayImpl::Time(array.into()))
758 }
759
760 fn from_timestampsecond_array(
761 &self,
762 array: &arrow_array::TimestampSecondArray,
763 ) -> Result<ArrayImpl, ArrayError> {
764 Ok(ArrayImpl::Timestamp(array.into()))
765 }
766 fn from_timestampsecond_some_array(
767 &self,
768 array: &arrow_array::TimestampSecondArray,
769 ) -> Result<ArrayImpl, ArrayError> {
770 Ok(ArrayImpl::Timestamptz(array.into()))
771 }
772
773 fn from_timestampms_array(
774 &self,
775 array: &arrow_array::TimestampMillisecondArray,
776 ) -> Result<ArrayImpl, ArrayError> {
777 Ok(ArrayImpl::Timestamp(array.into()))
778 }
779
780 fn from_timestampms_some_array(
781 &self,
782 array: &arrow_array::TimestampMillisecondArray,
783 ) -> Result<ArrayImpl, ArrayError> {
784 Ok(ArrayImpl::Timestamptz(array.into()))
785 }
786
787 fn from_timestampus_array(
788 &self,
789 array: &arrow_array::TimestampMicrosecondArray,
790 ) -> Result<ArrayImpl, ArrayError> {
791 Ok(ArrayImpl::Timestamp(array.into()))
792 }
793
794 fn from_timestampus_some_array(
795 &self,
796 array: &arrow_array::TimestampMicrosecondArray,
797 ) -> Result<ArrayImpl, ArrayError> {
798 Ok(ArrayImpl::Timestamptz(array.into()))
799 }
800
801 fn from_timestampns_array(
802 &self,
803 array: &arrow_array::TimestampNanosecondArray,
804 ) -> Result<ArrayImpl, ArrayError> {
805 Ok(ArrayImpl::Timestamp(array.into()))
806 }
807
808 fn from_timestampns_some_array(
809 &self,
810 array: &arrow_array::TimestampNanosecondArray,
811 ) -> Result<ArrayImpl, ArrayError> {
812 Ok(ArrayImpl::Timestamptz(array.into()))
813 }
814
815 fn from_interval_array(
816 &self,
817 array: &arrow_array::IntervalMonthDayNanoArray,
818 ) -> Result<ArrayImpl, ArrayError> {
819 Ok(ArrayImpl::Interval(array.into()))
820 }
821
822 fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
823 Ok(ArrayImpl::Utf8(array.into()))
824 }
825
826 fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
827 Ok(ArrayImpl::Bytea(array.into()))
828 }
829
830 fn from_large_utf8_array(
831 &self,
832 array: &arrow_array::LargeStringArray,
833 ) -> Result<ArrayImpl, ArrayError> {
834 Ok(ArrayImpl::Utf8(array.into()))
835 }
836
837 fn from_large_binary_array(
838 &self,
839 array: &arrow_array::LargeBinaryArray,
840 ) -> Result<ArrayImpl, ArrayError> {
841 Ok(ArrayImpl::Bytea(array.into()))
842 }
843
844 fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
845 use arrow_array::Array;
846 let arrow_schema::DataType::List(field) = array.data_type() else {
847 panic!("nested field types cannot be determined.");
848 };
849 Ok(ArrayImpl::List(ListArray {
850 value: Box::new(self.from_array(field, array.values())?),
851 bitmap: match array.nulls() {
852 Some(nulls) => nulls.iter().collect(),
853 None => Bitmap::ones(array.len()),
854 },
855 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
856 }))
857 }
858
859 fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
860 use arrow_array::Array;
861 let arrow_schema::DataType::Struct(fields) = array.data_type() else {
862 panic!("nested field types cannot be determined.");
863 };
864 Ok(ArrayImpl::Struct(StructArray::new(
865 self.from_fields(fields)?,
866 array
867 .columns()
868 .iter()
869 .zip_eq_fast(fields)
870 .map(|(array, field)| self.from_array(field, array).map(Arc::new))
871 .try_collect()?,
872 (0..array.len()).map(|i| array.is_valid(i)).collect(),
873 )))
874 }
875
876 fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
877 use arrow_array::Array;
878 let struct_array = self.from_struct_array(array.entries())?;
879 let list_array = ListArray {
880 value: Box::new(struct_array),
881 bitmap: match array.nulls() {
882 Some(nulls) => nulls.iter().collect(),
883 None => Bitmap::ones(array.len()),
884 },
885 offsets: array.offsets().iter().map(|o| *o as u32).collect(),
886 };
887
888 Ok(ArrayImpl::Map(MapArray { inner: list_array }))
889 }
890}
891
892impl From<&Bitmap> for arrow_buffer::NullBuffer {
893 fn from(bitmap: &Bitmap) -> Self {
894 bitmap.iter().collect()
895 }
896}
897
898macro_rules! converts {
900 ($ArrayType:ty, $ArrowType:ty) => {
901 impl From<&$ArrayType> for $ArrowType {
902 fn from(array: &$ArrayType) -> Self {
903 array.iter().collect()
904 }
905 }
906 impl From<&$ArrowType> for $ArrayType {
907 fn from(array: &$ArrowType) -> Self {
908 array.iter().collect()
909 }
910 }
911 impl From<&[$ArrowType]> for $ArrayType {
912 fn from(arrays: &[$ArrowType]) -> Self {
913 arrays.iter().flat_map(|a| a.iter()).collect()
914 }
915 }
916 };
917 ($ArrayType:ty, $ArrowType:ty, @map) => {
919 impl From<&$ArrayType> for $ArrowType {
920 fn from(array: &$ArrayType) -> Self {
921 array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
922 }
923 }
924 impl From<&$ArrowType> for $ArrayType {
925 fn from(array: &$ArrowType) -> Self {
926 array
927 .iter()
928 .map(|o| {
929 o.map(|v| {
930 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
931 })
932 })
933 .collect()
934 }
935 }
936 impl From<&[$ArrowType]> for $ArrayType {
937 fn from(arrays: &[$ArrowType]) -> Self {
938 arrays
939 .iter()
940 .flat_map(|a| a.iter())
941 .map(|o| {
942 o.map(|v| {
943 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
944 })
945 })
946 .collect()
947 }
948 }
949 };
950}
951
952macro_rules! converts_with_type {
954 ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
955 impl From<&$ArrayType> for $ArrowType {
956 fn from(array: &$ArrayType) -> Self {
957 let values: Vec<Option<$ToType>> =
958 array.iter().map(|x| x.map(|v| v as $ToType)).collect();
959 <$ArrowType>::from_iter(values)
960 }
961 }
962
963 impl From<&$ArrowType> for $ArrayType {
964 fn from(array: &$ArrowType) -> Self {
965 let values: Vec<Option<$FromType>> =
966 array.iter().map(|x| x.map(|v| v as $FromType)).collect();
967 <$ArrayType>::from_iter(values)
968 }
969 }
970
971 impl From<&[$ArrowType]> for $ArrayType {
972 fn from(arrays: &[$ArrowType]) -> Self {
973 let values: Vec<Option<$FromType>> = arrays
974 .iter()
975 .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
976 .collect();
977 <$ArrayType>::from_iter(values)
978 }
979 }
980 };
981}
982
983macro_rules! converts_with_timeunit {
984 ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
985
986 impl From<&$ArrayType> for $ArrowType {
987 fn from(array: &$ArrayType) -> Self {
988 array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
989 }
990 }
991
992 impl From<&$ArrowType> for $ArrayType {
993 fn from(array: &$ArrowType) -> Self {
994 array.iter().map(|o| {
995 o.map(|v| {
996 let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
997 timestamp
998 })
999 }).collect()
1000 }
1001 }
1002
1003 impl From<&[$ArrowType]> for $ArrayType {
1004 fn from(arrays: &[$ArrowType]) -> Self {
1005 arrays
1006 .iter()
1007 .flat_map(|a| a.iter())
1008 .map(|o| {
1009 o.map(|v| {
1010 <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1011 })
1012 })
1013 .collect()
1014 }
1015 }
1016
1017 };
1018}
1019
1020converts!(BoolArray, arrow_array::BooleanArray);
1021converts!(I16Array, arrow_array::Int16Array);
1022converts!(I32Array, arrow_array::Int32Array);
1023converts!(I64Array, arrow_array::Int64Array);
1024converts!(F32Array, arrow_array::Float32Array, @map);
1025converts!(F64Array, arrow_array::Float64Array, @map);
1026converts!(BytesArray, arrow_array::BinaryArray);
1027converts!(BytesArray, arrow_array::LargeBinaryArray);
1028converts!(Utf8Array, arrow_array::StringArray);
1029converts!(Utf8Array, arrow_array::LargeStringArray);
1030converts!(DateArray, arrow_array::Date32Array, @map);
1031converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1032converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1033converts!(SerialArray, arrow_array::Int64Array, @map);
1034
1035converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1036converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1037converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1038converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1039
1040converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1041converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1042converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1043converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1044
1045converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1046converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1047converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1048converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1049
1050trait FromIntoArrow {
1052 type ArrowType;
1054 fn from_arrow(value: Self::ArrowType) -> Self;
1055 fn into_arrow(self) -> Self::ArrowType;
1056}
1057
1058trait FromIntoArrowWithUnit {
1061 type ArrowType;
1062 type TimestampType;
1064 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1065 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1066}
1067
1068impl FromIntoArrow for Serial {
1069 type ArrowType = i64;
1070
1071 fn from_arrow(value: Self::ArrowType) -> Self {
1072 value.into()
1073 }
1074
1075 fn into_arrow(self) -> Self::ArrowType {
1076 self.into()
1077 }
1078}
1079
1080impl FromIntoArrow for F32 {
1081 type ArrowType = f32;
1082
1083 fn from_arrow(value: Self::ArrowType) -> Self {
1084 value.into()
1085 }
1086
1087 fn into_arrow(self) -> Self::ArrowType {
1088 self.into()
1089 }
1090}
1091
1092impl FromIntoArrow for F64 {
1093 type ArrowType = f64;
1094
1095 fn from_arrow(value: Self::ArrowType) -> Self {
1096 value.into()
1097 }
1098
1099 fn into_arrow(self) -> Self::ArrowType {
1100 self.into()
1101 }
1102}
1103
1104impl FromIntoArrow for Date {
1105 type ArrowType = i32;
1106
1107 fn from_arrow(value: Self::ArrowType) -> Self {
1108 Date(arrow_array::types::Date32Type::to_naive_date(value))
1109 }
1110
1111 fn into_arrow(self) -> Self::ArrowType {
1112 arrow_array::types::Date32Type::from_naive_date(self.0)
1113 }
1114}
1115
1116impl FromIntoArrow for Time {
1117 type ArrowType = i64;
1118
1119 fn from_arrow(value: Self::ArrowType) -> Self {
1120 Time(
1121 NaiveTime::from_num_seconds_from_midnight_opt(
1122 (value / 1_000_000) as _,
1123 (value % 1_000_000 * 1000) as _,
1124 )
1125 .unwrap(),
1126 )
1127 }
1128
1129 fn into_arrow(self) -> Self::ArrowType {
1130 self.0
1131 .signed_duration_since(NaiveTime::default())
1132 .num_microseconds()
1133 .unwrap()
1134 }
1135}
1136
1137impl FromIntoArrowWithUnit for Timestamp {
1138 type ArrowType = i64;
1139 type TimestampType = TimeUnit;
1140
1141 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1142 match time_unit {
1143 TimeUnit::Second => {
1144 Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1145 }
1146 TimeUnit::Millisecond => {
1147 Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1148 }
1149 TimeUnit::Microsecond => {
1150 Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1151 }
1152 TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1153 }
1154 }
1155
1156 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1157 match time_unit {
1158 TimeUnit::Second => self.0.and_utc().timestamp(),
1159 TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1160 TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1161 TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1162 }
1163 }
1164}
1165
1166impl FromIntoArrowWithUnit for Timestamptz {
1167 type ArrowType = i64;
1168 type TimestampType = TimeUnit;
1169
1170 fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1171 match time_unit {
1172 TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1173 TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1174 TimeUnit::Microsecond => Timestamptz::from_micros(value),
1175 TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1176 }
1177 }
1178
1179 fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1180 match time_unit {
1181 TimeUnit::Second => self.timestamp(),
1182 TimeUnit::Millisecond => self.timestamp_millis(),
1183 TimeUnit::Microsecond => self.timestamp_micros(),
1184 TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1185 }
1186 }
1187}
1188
1189impl FromIntoArrow for Interval {
1190 type ArrowType = ArrowIntervalType;
1191
1192 fn from_arrow(value: Self::ArrowType) -> Self {
1193 <ArrowIntervalType as crate::array::arrow::ArrowIntervalTypeTrait>::to_interval(value)
1194 }
1195
1196 fn into_arrow(self) -> Self::ArrowType {
1197 <ArrowIntervalType as crate::array::arrow::ArrowIntervalTypeTrait>::from_interval(self)
1198 }
1199}
1200
1201impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1202 fn from(array: &DecimalArray) -> Self {
1203 let mut builder =
1204 arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1205 for value in array.iter() {
1206 builder.append_option(value.map(|d| d.to_string()));
1207 }
1208 builder.finish()
1209 }
1210}
1211
1212impl From<&DecimalArray> for arrow_array::StringArray {
1213 fn from(array: &DecimalArray) -> Self {
1214 let mut builder =
1215 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1216 for value in array.iter() {
1217 builder.append_option(value.map(|d| d.to_string()));
1218 }
1219 builder.finish()
1220 }
1221}
1222
1223impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1225 type Error = ArrayError;
1226
1227 fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1228 if array.scale() < 0 {
1229 bail!("support negative scale for arrow decimal")
1230 }
1231 let from_arrow = |value| {
1232 const NAN: i128 = i128::MIN + 1;
1233 let res = match value {
1234 NAN => Decimal::NaN,
1235 i128::MAX => Decimal::PositiveInf,
1236 i128::MIN => Decimal::NegativeInf,
1237 _ => Decimal::Normalized(
1238 rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1239 .map_err(ArrayError::internal)?,
1240 ),
1241 };
1242 Ok(res)
1243 };
1244 array
1245 .iter()
1246 .map(|o| o.map(from_arrow).transpose())
1247 .collect::<Result<Self, Self::Error>>()
1248 }
1249}
1250
1251impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1253 type Error = ArrayError;
1254
1255 fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1256 let from_arrow = |value| {
1257 let res = Decimal::from(value);
1259 Ok(res)
1260 };
1261
1262 array
1264 .iter()
1265 .map(|o| o.map(from_arrow).transpose())
1266 .collect::<Result<Self, Self::Error>>()
1267 }
1268}
1269
1270impl TryFrom<&arrow_array::Float16Array> for F32Array {
1271 type Error = ArrayError;
1272
1273 fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1274 let from_arrow = |value| Ok(f32::from(value));
1275
1276 array
1277 .iter()
1278 .map(|o| o.map(from_arrow).transpose())
1279 .collect::<Result<Self, Self::Error>>()
1280 }
1281}
1282
1283impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1284 type Error = ArrayError;
1285
1286 fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1287 array
1288 .iter()
1289 .map(|o| {
1290 o.map(|s| {
1291 let s = std::str::from_utf8(s)
1292 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1293 s.parse()
1294 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1295 })
1296 .transpose()
1297 })
1298 .try_collect()
1299 }
1300}
1301
1302impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1303 type Error = ArrayError;
1304
1305 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1306 array
1307 .iter()
1308 .map(|o| {
1309 o.map(|s| {
1310 s.parse()
1311 .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1312 })
1313 .transpose()
1314 })
1315 .try_collect()
1316 }
1317}
1318
1319impl From<&JsonbArray> for arrow_array::StringArray {
1320 fn from(array: &JsonbArray) -> Self {
1321 let mut builder =
1322 arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1323 for value in array.iter() {
1324 match value {
1325 Some(jsonb) => {
1326 write!(&mut builder, "{}", jsonb).unwrap();
1327 builder.append_value("");
1328 }
1329 None => builder.append_null(),
1330 }
1331 }
1332 builder.finish()
1333 }
1334}
1335
1336impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1337 type Error = ArrayError;
1338
1339 fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1340 array
1341 .iter()
1342 .map(|o| {
1343 o.map(|s| {
1344 s.parse()
1345 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1346 })
1347 .transpose()
1348 })
1349 .try_collect()
1350 }
1351}
1352
1353impl From<&JsonbArray> for arrow_array::LargeStringArray {
1354 fn from(array: &JsonbArray) -> Self {
1355 let mut builder =
1356 arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1357 for value in array.iter() {
1358 match value {
1359 Some(jsonb) => {
1360 write!(&mut builder, "{}", jsonb).unwrap();
1361 builder.append_value("");
1362 }
1363 None => builder.append_null(),
1364 }
1365 }
1366 builder.finish()
1367 }
1368}
1369
1370impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1371 type Error = ArrayError;
1372
1373 fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1374 array
1375 .iter()
1376 .map(|o| {
1377 o.map(|s| {
1378 s.parse()
1379 .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1380 })
1381 .transpose()
1382 })
1383 .try_collect()
1384 }
1385}
1386
1387impl From<arrow_buffer::i256> for Int256 {
1388 fn from(value: arrow_buffer::i256) -> Self {
1389 let buffer = value.to_be_bytes();
1390 Int256::from_be_bytes(buffer)
1391 }
1392}
1393
1394impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1395 fn from(val: Int256Ref<'a>) -> Self {
1396 let buffer = val.to_be_bytes();
1397 arrow_buffer::i256::from_be_bytes(buffer)
1398 }
1399}
1400
1401impl From<&Int256Array> for arrow_array::Decimal256Array {
1402 fn from(array: &Int256Array) -> Self {
1403 array
1404 .iter()
1405 .map(|o| o.map(arrow_buffer::i256::from))
1406 .collect()
1407 }
1408}
1409
1410impl From<&arrow_array::Decimal256Array> for Int256Array {
1411 fn from(array: &arrow_array::Decimal256Array) -> Self {
1412 let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1413
1414 values
1415 .iter()
1416 .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1417 .collect()
1418 }
1419}
1420
1421pub fn is_parquet_schema_match_source_schema(
1432 arrow_data_type: &arrow_schema::DataType,
1433 rw_data_type: &crate::types::DataType,
1434) -> bool {
1435 matches!(
1436 (arrow_data_type, rw_data_type),
1437 (arrow_schema::DataType::Boolean, RwDataType::Boolean)
1438 | (
1439 arrow_schema::DataType::Int8
1440 | arrow_schema::DataType::Int16
1441 | arrow_schema::DataType::UInt8,
1442 RwDataType::Int16
1443 )
1444 | (
1445 arrow_schema::DataType::Int32 | arrow_schema::DataType::UInt16,
1446 RwDataType::Int32
1447 )
1448 | (
1449 arrow_schema::DataType::Int64 | arrow_schema::DataType::UInt32,
1450 RwDataType::Int64
1451 )
1452 | (
1453 arrow_schema::DataType::UInt64 | arrow_schema::DataType::Decimal128(_, _),
1454 RwDataType::Decimal
1455 )
1456 | (arrow_schema::DataType::Decimal256(_, _), RwDataType::Int256)
1457 | (
1458 arrow_schema::DataType::Float16 | arrow_schema::DataType::Float32,
1459 RwDataType::Float32
1460 )
1461 | (arrow_schema::DataType::Float64, RwDataType::Float64)
1462 | (
1463 arrow_schema::DataType::Timestamp(_, None),
1464 RwDataType::Timestamp
1465 )
1466 | (
1467 arrow_schema::DataType::Timestamp(_, Some(_)),
1468 RwDataType::Timestamptz
1469 )
1470 | (arrow_schema::DataType::Date32, RwDataType::Date)
1471 | (
1472 arrow_schema::DataType::Time32(_) | arrow_schema::DataType::Time64(_),
1473 RwDataType::Time
1474 )
1475 | (
1476 arrow_schema::DataType::Interval(IntervalUnit::MonthDayNano),
1477 RwDataType::Interval
1478 )
1479 | (
1480 arrow_schema::DataType::Utf8 | arrow_schema::DataType::LargeUtf8,
1481 RwDataType::Varchar
1482 )
1483 | (
1484 arrow_schema::DataType::Binary | arrow_schema::DataType::LargeBinary,
1485 RwDataType::Bytea
1486 )
1487 | (arrow_schema::DataType::List(_), RwDataType::List(_))
1488 | (arrow_schema::DataType::Struct(_), RwDataType::Struct(_))
1489 | (arrow_schema::DataType::Map(_, _), RwDataType::Map(_))
1490 )
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495
1496 use super::*;
1497
1498 #[test]
1499 fn bool() {
1500 let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1501 let arrow = arrow_array::BooleanArray::from(&array);
1502 assert_eq!(BoolArray::from(&arrow), array);
1503 }
1504
1505 #[test]
1506 fn i16() {
1507 let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1508 let arrow = arrow_array::Int16Array::from(&array);
1509 assert_eq!(I16Array::from(&arrow), array);
1510 }
1511
1512 #[test]
1513 fn i32() {
1514 let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1515 let arrow = arrow_array::Int32Array::from(&array);
1516 assert_eq!(I32Array::from(&arrow), array);
1517 }
1518
1519 #[test]
1520 fn i64() {
1521 let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1522 let arrow = arrow_array::Int64Array::from(&array);
1523 assert_eq!(I64Array::from(&arrow), array);
1524 }
1525
1526 #[test]
1527 fn f32() {
1528 let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1529 let arrow = arrow_array::Float32Array::from(&array);
1530 assert_eq!(F32Array::from(&arrow), array);
1531 }
1532
1533 #[test]
1534 fn f64() {
1535 let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1536 let arrow = arrow_array::Float64Array::from(&array);
1537 assert_eq!(F64Array::from(&arrow), array);
1538 }
1539
1540 #[test]
1541 fn int8() {
1542 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1543 let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1544 let converted: PrimitiveArray<i16> = (&arr).into();
1545 assert_eq!(converted, array);
1546 }
1547
1548 #[test]
1549 fn uint8() {
1550 let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1551 let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1552 let converted: PrimitiveArray<i16> = (&arr).into();
1553 assert_eq!(converted, array);
1554 }
1555
1556 #[test]
1557 fn uint16() {
1558 let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1559 let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1560 let converted: PrimitiveArray<i32> = (&arr).into();
1561 assert_eq!(converted, array);
1562 }
1563
1564 #[test]
1565 fn uint32() {
1566 let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1567 let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1568 let converted: PrimitiveArray<i64> = (&arr).into();
1569 assert_eq!(converted, array);
1570 }
1571
1572 #[test]
1573 fn uint64() {
1574 let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1575 None,
1576 Some(Decimal::Normalized("7".parse().unwrap())),
1577 Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1578 ]);
1579 let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1580 let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1581 assert_eq!(converted, array);
1582 }
1583
1584 #[test]
1585 fn date() {
1586 let array = DateArray::from_iter([
1587 None,
1588 Date::with_days_since_ce(12345).ok(),
1589 Date::with_days_since_ce(-12345).ok(),
1590 ]);
1591 let arrow = arrow_array::Date32Array::from(&array);
1592 assert_eq!(DateArray::from(&arrow), array);
1593 }
1594
1595 #[test]
1596 fn time() {
1597 let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1598 let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1599 assert_eq!(TimeArray::from(&arrow), array);
1600 }
1601
1602 #[test]
1603 fn timestamp() {
1604 let array =
1605 TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1606 let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1607 assert_eq!(TimestampArray::from(&arrow), array);
1608 }
1609
1610 #[test]
1611 fn interval() {
1612 let array = IntervalArray::from_iter([
1613 None,
1614 Some(Interval::from_month_day_usec(
1615 1_000_000,
1616 1_000,
1617 1_000_000_000,
1618 )),
1619 Some(Interval::from_month_day_usec(
1620 -1_000_000,
1621 -1_000,
1622 -1_000_000_000,
1623 )),
1624 ]);
1625 let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1626 assert_eq!(IntervalArray::from(&arrow), array);
1627 }
1628
1629 #[test]
1630 fn string() {
1631 let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1632 let arrow = arrow_array::StringArray::from(&array);
1633 assert_eq!(Utf8Array::from(&arrow), array);
1634 }
1635
1636 #[test]
1637 fn binary() {
1638 let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1639 let arrow = arrow_array::BinaryArray::from(&array);
1640 assert_eq!(BytesArray::from(&arrow), array);
1641 }
1642
1643 #[test]
1644 fn decimal() {
1645 let array = DecimalArray::from_iter([
1646 None,
1647 Some(Decimal::NaN),
1648 Some(Decimal::PositiveInf),
1649 Some(Decimal::NegativeInf),
1650 Some(Decimal::Normalized("123.4".parse().unwrap())),
1651 Some(Decimal::Normalized("123.456".parse().unwrap())),
1652 ]);
1653 let arrow = arrow_array::LargeBinaryArray::from(&array);
1654 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1655
1656 let arrow = arrow_array::StringArray::from(&array);
1657 assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1658 }
1659
1660 #[test]
1661 fn jsonb() {
1662 let array = JsonbArray::from_iter([
1663 None,
1664 Some("null".parse().unwrap()),
1665 Some("false".parse().unwrap()),
1666 Some("1".parse().unwrap()),
1667 Some("[1, 2, 3]".parse().unwrap()),
1668 Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1669 ]);
1670 let arrow = arrow_array::LargeStringArray::from(&array);
1671 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1672
1673 let arrow = arrow_array::StringArray::from(&array);
1674 assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1675 }
1676
1677 #[test]
1678 fn int256() {
1679 let values = [
1680 None,
1681 Some(Int256::from(1)),
1682 Some(Int256::from(i64::MAX)),
1683 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1684 Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1685 Some(
1686 Int256::from(i64::MAX)
1687 * Int256::from(i64::MAX)
1688 * Int256::from(i64::MAX)
1689 * Int256::from(i64::MAX),
1690 ),
1691 Some(Int256::min_value()),
1692 Some(Int256::max_value()),
1693 ];
1694
1695 let array =
1696 Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1697 let arrow = arrow_array::Decimal256Array::from(&array);
1698 assert_eq!(Int256Array::from(&arrow), array);
1699 }
1700}