risingwave_common/array/arrow/
arrow_impl.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Converts between arrays and Apache Arrow arrays.
16//!
17//! This file acts as a template file for conversion code between
18//! arrays and different version of Apache Arrow.
19//!
20//! The conversion logic will be implemented for the arrow version specified in the outer mod by
21//! `super::arrow_xxx`, such as `super::arrow_array`.
22//!
23//! When we want to implement the conversion logic for an arrow version, we first
24//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
25//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
26//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
27//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
28//! version.
29//!
30//! Example can be seen in `arrow_default.rs`, which is also as followed:
31//! ```ignore
32//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
33//!
34//! #[allow(clippy::duplicate_mod)]
35//! #[path = "./arrow_impl.rs"]
36//! mod arrow_impl;
37//! ```
38
39// Is this a bug? Why do we have these lints?
40#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53// This is important because we want to use the arrow version specified by the outer mod.
54use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55// Other import should always use the absolute path.
56use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60/// Defines how to convert RisingWave arrays to Arrow arrays.
61///
62/// This trait allows for customized conversion logic for different external systems using Arrow.
63/// The default implementation is based on the `From` implemented in this mod.
64pub trait ToArrow {
65    /// Converts RisingWave `DataChunk` to Arrow `RecordBatch` with specified schema.
66    ///
67    /// This function will try to convert the array if the type is not same with the schema.
68    fn to_record_batch(
69        &self,
70        schema: arrow_schema::SchemaRef,
71        chunk: &DataChunk,
72    ) -> Result<arrow_array::RecordBatch, ArrayError> {
73        // compact the chunk if it's not compacted
74        if !chunk.is_vis_compacted() {
75            let c = chunk.clone();
76            return self.to_record_batch(schema, &c.compact_vis());
77        }
78
79        // convert each column to arrow array
80        let columns: Vec<_> = chunk
81            .columns()
82            .iter()
83            .zip_eq_fast(schema.fields().iter())
84            .map(|(column, field)| self.to_array(field.data_type(), column))
85            .try_collect()?;
86
87        // create record batch
88        let opts =
89            arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90        arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91            .map_err(ArrayError::to_arrow)
92    }
93
94    /// Converts RisingWave array to Arrow array.
95    fn to_array(
96        &self,
97        data_type: &arrow_schema::DataType,
98        array: &ArrayImpl,
99    ) -> Result<arrow_array::ArrayRef, ArrayError> {
100        let arrow_array = match array {
101            ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102            ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103            ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104            ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105            ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106            ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107            ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108            ArrayImpl::Date(array) => self.date_to_arrow(array),
109            ArrayImpl::Time(array) => self.time_to_arrow(array),
110            ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111            ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112            ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113            ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114            ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115            ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116            ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117            ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118            ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119            ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121            ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122        }?;
123        if arrow_array.data_type() != data_type {
124            arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125        } else {
126            Ok(arrow_array)
127        }
128    }
129
130    #[inline]
131    fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132        Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133    }
134
135    #[inline]
136    fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137        Ok(Arc::new(arrow_array::Int16Array::from(array)))
138    }
139
140    #[inline]
141    fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142        Ok(Arc::new(arrow_array::Int32Array::from(array)))
143    }
144
145    #[inline]
146    fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147        Ok(Arc::new(arrow_array::Int64Array::from(array)))
148    }
149
150    #[inline]
151    fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152        Ok(Arc::new(arrow_array::Float32Array::from(array)))
153    }
154
155    #[inline]
156    fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157        Ok(Arc::new(arrow_array::Float64Array::from(array)))
158    }
159
160    #[inline]
161    fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162        Ok(Arc::new(arrow_array::StringArray::from(array)))
163    }
164
165    #[inline]
166    fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167        Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168    }
169
170    #[inline]
171    fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172        Ok(Arc::new(arrow_array::Date32Array::from(array)))
173    }
174
175    #[inline]
176    fn timestamp_to_arrow(
177        &self,
178        array: &TimestampArray,
179    ) -> Result<arrow_array::ArrayRef, ArrayError> {
180        Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181            array,
182        )))
183    }
184
185    #[inline]
186    fn timestamptz_to_arrow(
187        &self,
188        array: &TimestamptzArray,
189    ) -> Result<arrow_array::ArrayRef, ArrayError> {
190        Ok(Arc::new(
191            arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192        ))
193    }
194
195    #[inline]
196    fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197        Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198    }
199
200    #[inline]
201    fn interval_to_arrow(
202        &self,
203        array: &IntervalArray,
204    ) -> Result<arrow_array::ArrayRef, ArrayError> {
205        Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206            array,
207        )))
208    }
209
210    #[inline]
211    fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212        Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213    }
214
215    // Decimal values are stored as ASCII text representation in a string array.
216    #[inline]
217    fn decimal_to_arrow(
218        &self,
219        _data_type: &arrow_schema::DataType,
220        array: &DecimalArray,
221    ) -> Result<arrow_array::ArrayRef, ArrayError> {
222        Ok(Arc::new(arrow_array::StringArray::from(array)))
223    }
224
225    // JSON values are stored as text representation in a string array.
226    #[inline]
227    fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228        Ok(Arc::new(arrow_array::StringArray::from(array)))
229    }
230
231    #[inline]
232    fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233        Ok(Arc::new(arrow_array::Int64Array::from(array)))
234    }
235
236    #[inline]
237    fn list_to_arrow(
238        &self,
239        data_type: &arrow_schema::DataType,
240        array: &ListArray,
241    ) -> Result<arrow_array::ArrayRef, ArrayError> {
242        let arrow_schema::DataType::List(field) = data_type else {
243            return Err(ArrayError::to_arrow("Invalid list type"));
244        };
245        let values = self.to_array(field.data_type(), array.values())?;
246        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248        Ok(Arc::new(arrow_array::ListArray::new(
249            field.clone(),
250            offsets,
251            values,
252            nulls,
253        )))
254    }
255
256    #[inline]
257    fn vector_to_arrow(
258        &self,
259        data_type: &arrow_schema::DataType,
260        array: &VectorArray,
261    ) -> Result<arrow_array::ArrayRef, ArrayError> {
262        let arrow_schema::DataType::List(field) = data_type else {
263            return Err(ArrayError::to_arrow("Invalid list type"));
264        };
265        if field.data_type() != &arrow_schema::DataType::Float32 {
266            return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267        }
268        let values = Arc::new(arrow_array::Float32Array::from(
269            array.as_raw_slice().to_vec(),
270        ));
271        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273        Ok(Arc::new(arrow_array::ListArray::new(
274            field.clone(),
275            offsets,
276            values,
277            nulls,
278        )))
279    }
280
281    #[inline]
282    fn struct_to_arrow(
283        &self,
284        data_type: &arrow_schema::DataType,
285        array: &StructArray,
286    ) -> Result<arrow_array::ArrayRef, ArrayError> {
287        let arrow_schema::DataType::Struct(fields) = data_type else {
288            return Err(ArrayError::to_arrow("Invalid struct type"));
289        };
290        Ok(Arc::new(arrow_array::StructArray::new(
291            fields.clone(),
292            array
293                .fields()
294                .zip_eq_fast(fields)
295                .map(|(arr, field)| self.to_array(field.data_type(), arr))
296                .try_collect::<_, _, ArrayError>()?,
297            Some(array.null_bitmap().into()),
298        )))
299    }
300
301    #[inline]
302    fn map_to_arrow(
303        &self,
304        data_type: &arrow_schema::DataType,
305        array: &MapArray,
306    ) -> Result<arrow_array::ArrayRef, ArrayError> {
307        let arrow_schema::DataType::Map(field, ordered) = data_type else {
308            return Err(ArrayError::to_arrow("Invalid map type"));
309        };
310        if *ordered {
311            return Err(ArrayError::to_arrow("Sorted map is not supported"));
312        }
313        let values = self
314            .struct_to_arrow(field.data_type(), array.as_struct())?
315            .as_struct()
316            .clone();
317        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
318        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
319        Ok(Arc::new(arrow_array::MapArray::new(
320            field.clone(),
321            offsets,
322            values,
323            nulls,
324            *ordered,
325        )))
326    }
327
328    /// Convert RisingWave data type to Arrow data type.
329    ///
330    /// This function returns a `Field` instead of `DataType` because some may be converted to
331    /// extension types which require additional metadata in the field.
332    fn to_arrow_field(
333        &self,
334        name: &str,
335        value: &DataType,
336    ) -> Result<arrow_schema::Field, ArrayError> {
337        let data_type = match value {
338            // using the inline function
339            DataType::Boolean => self.bool_type_to_arrow(),
340            DataType::Int16 => self.int16_type_to_arrow(),
341            DataType::Int32 => self.int32_type_to_arrow(),
342            DataType::Int64 => self.int64_type_to_arrow(),
343            DataType::Int256 => self.int256_type_to_arrow(),
344            DataType::Float32 => self.float32_type_to_arrow(),
345            DataType::Float64 => self.float64_type_to_arrow(),
346            DataType::Date => self.date_type_to_arrow(),
347            DataType::Time => self.time_type_to_arrow(),
348            DataType::Timestamp => self.timestamp_type_to_arrow(),
349            DataType::Timestamptz => self.timestamptz_type_to_arrow(),
350            DataType::Interval => self.interval_type_to_arrow(),
351            DataType::Varchar => self.varchar_type_to_arrow(),
352            DataType::Bytea => self.bytea_type_to_arrow(),
353            DataType::Serial => self.serial_type_to_arrow(),
354            DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
355            DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
356            DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
357            DataType::List(list) => self.list_type_to_arrow(list)?,
358            DataType::Map(map) => self.map_type_to_arrow(map)?,
359            DataType::Vector(_) => self.vector_type_to_arrow()?,
360        };
361        Ok(arrow_schema::Field::new(name, data_type, true))
362    }
363
364    #[inline]
365    fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
366        arrow_schema::DataType::Boolean
367    }
368
369    #[inline]
370    fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
371        arrow_schema::DataType::Int16
372    }
373
374    #[inline]
375    fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
376        arrow_schema::DataType::Int32
377    }
378
379    #[inline]
380    fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
381        arrow_schema::DataType::Int64
382    }
383
384    #[inline]
385    fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
386        arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
387    }
388
389    #[inline]
390    fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
391        arrow_schema::DataType::Float32
392    }
393
394    #[inline]
395    fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
396        arrow_schema::DataType::Float64
397    }
398
399    #[inline]
400    fn date_type_to_arrow(&self) -> arrow_schema::DataType {
401        arrow_schema::DataType::Date32
402    }
403
404    #[inline]
405    fn time_type_to_arrow(&self) -> arrow_schema::DataType {
406        arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
407    }
408
409    #[inline]
410    fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
411        arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
412    }
413
414    #[inline]
415    fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
416        arrow_schema::DataType::Timestamp(
417            arrow_schema::TimeUnit::Microsecond,
418            Some("+00:00".into()),
419        )
420    }
421
422    #[inline]
423    fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
424        arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
425    }
426
427    #[inline]
428    fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
429        arrow_schema::DataType::Utf8
430    }
431
432    #[inline]
433    fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
434        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
435            .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
436    }
437
438    #[inline]
439    fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
440        arrow_schema::DataType::Binary
441    }
442
443    #[inline]
444    fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
445        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
446            .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
447    }
448
449    #[inline]
450    fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
451        arrow_schema::DataType::Int64
452    }
453
454    #[inline]
455    fn list_type_to_arrow(
456        &self,
457        list_type: &ListType,
458    ) -> Result<arrow_schema::DataType, ArrayError> {
459        Ok(arrow_schema::DataType::List(Arc::new(
460            self.to_arrow_field("item", list_type.elem())?,
461        )))
462    }
463
464    #[inline]
465    fn struct_type_to_arrow(
466        &self,
467        fields: &StructType,
468    ) -> Result<arrow_schema::DataType, ArrayError> {
469        Ok(arrow_schema::DataType::Struct(
470            fields
471                .iter()
472                .map(|(name, ty)| self.to_arrow_field(name, ty))
473                .try_collect::<_, _, ArrayError>()?,
474        ))
475    }
476
477    #[inline]
478    fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
479        let sorted = false;
480        // "key" is always non-null
481        let key = self
482            .to_arrow_field("key", map_type.key())?
483            .with_nullable(false);
484        let value = self.to_arrow_field("value", map_type.value())?;
485        Ok(arrow_schema::DataType::Map(
486            Arc::new(arrow_schema::Field::new(
487                "entries",
488                arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
489                // "entries" is always non-null
490                false,
491            )),
492            sorted,
493        ))
494    }
495
496    #[inline]
497    fn vector_type_to_arrow(&self) -> Result<arrow_schema::DataType, ArrayError> {
498        Ok(arrow_schema::DataType::List(Arc::new(
499            self.to_arrow_field("item", &VECTOR_ITEM_TYPE)?,
500        )))
501    }
502}
503
504/// Defines how to convert Arrow arrays to RisingWave arrays.
505#[allow(clippy::wrong_self_convention)]
506pub trait FromArrow {
507    /// Converts Arrow `RecordBatch` to RisingWave `DataChunk`.
508    fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
509        let mut columns = Vec::with_capacity(batch.num_columns());
510        for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
511            let column = Arc::new(self.from_array(field, array)?);
512            columns.push(column);
513        }
514        Ok(DataChunk::new(columns, batch.num_rows()))
515    }
516
517    /// Converts Arrow `Fields` to RisingWave `StructType`.
518    fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
519        Ok(StructType::new(
520            fields
521                .iter()
522                .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
523                .try_collect::<_, Vec<_>, ArrayError>()?,
524        ))
525    }
526
527    /// Converts Arrow `Field` to RisingWave `DataType`.
528    fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
529        use arrow_schema::DataType::*;
530        use arrow_schema::IntervalUnit::*;
531        use arrow_schema::TimeUnit::*;
532
533        // extension type
534        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
535            return self.from_extension_type(type_name, field.data_type());
536        }
537
538        Ok(match field.data_type() {
539            Boolean => DataType::Boolean,
540            Int16 => DataType::Int16,
541            Int32 => DataType::Int32,
542            Int64 => DataType::Int64,
543            Int8 => DataType::Int16,
544            UInt8 => DataType::Int16,
545            UInt16 => DataType::Int32,
546            UInt32 => DataType::Int64,
547            UInt64 => DataType::Decimal,
548            Float16 => DataType::Float32,
549            Float32 => DataType::Float32,
550            Float64 => DataType::Float64,
551            Decimal128(_, _) => DataType::Decimal,
552            Decimal256(_, _) => DataType::Int256,
553            Date32 => DataType::Date,
554            Time64(Microsecond) => DataType::Time,
555            Timestamp(Microsecond, None) => DataType::Timestamp,
556            Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
557            Timestamp(Second, None) => DataType::Timestamp,
558            Timestamp(Second, Some(_)) => DataType::Timestamptz,
559            Timestamp(Millisecond, None) => DataType::Timestamp,
560            Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
561            Timestamp(Nanosecond, None) => DataType::Timestamp,
562            Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
563            Interval(MonthDayNano) => DataType::Interval,
564            Utf8 => DataType::Varchar,
565            Binary => DataType::Bytea,
566            LargeUtf8 => self.from_large_utf8()?,
567            LargeBinary => self.from_large_binary()?,
568            List(field) => DataType::list(self.from_field(field)?),
569            Struct(fields) => DataType::Struct(self.from_fields(fields)?),
570            Map(field, _is_sorted) => {
571                let entries = self.from_field(field)?;
572                DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
573                    ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
574                })?)
575            }
576            t => {
577                return Err(ArrayError::from_arrow(format!(
578                    "unsupported arrow data type: {t:?}"
579                )));
580            }
581        })
582    }
583
584    /// Converts Arrow `LargeUtf8` type to RisingWave data type.
585    fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
586        Ok(DataType::Varchar)
587    }
588
589    /// Converts Arrow `LargeBinary` type to RisingWave data type.
590    fn from_large_binary(&self) -> Result<DataType, ArrayError> {
591        Ok(DataType::Bytea)
592    }
593
594    /// Converts Arrow extension type to RisingWave `DataType`.
595    fn from_extension_type(
596        &self,
597        type_name: &str,
598        physical_type: &arrow_schema::DataType,
599    ) -> Result<DataType, ArrayError> {
600        match (type_name, physical_type) {
601            ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
602            ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
603            _ => Err(ArrayError::from_arrow(format!(
604                "unsupported extension type: {type_name:?}"
605            ))),
606        }
607    }
608
609    /// Converts Arrow `Array` to RisingWave `ArrayImpl`.
610    fn from_array(
611        &self,
612        field: &arrow_schema::Field,
613        array: &arrow_array::ArrayRef,
614    ) -> Result<ArrayImpl, ArrayError> {
615        use arrow_schema::DataType::*;
616        use arrow_schema::IntervalUnit::*;
617        use arrow_schema::TimeUnit::*;
618
619        // extension type
620        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
621            return self.from_extension_array(type_name, array);
622        }
623        match array.data_type() {
624            Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
625            Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
626            Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
627            Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
628            Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
629            UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
630            UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
631            UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
632
633            UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
634            Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
635            Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
636            Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
637            Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
638            Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
639            Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
640            Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
641            Timestamp(Second, None) => {
642                self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
643            }
644            Timestamp(Second, Some(_)) => {
645                self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
646            }
647            Timestamp(Millisecond, None) => {
648                self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
649            }
650            Timestamp(Millisecond, Some(_)) => {
651                self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
652            }
653            Timestamp(Microsecond, None) => {
654                self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
655            }
656            Timestamp(Microsecond, Some(_)) => {
657                self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
658            }
659            Timestamp(Nanosecond, None) => {
660                self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
661            }
662            Timestamp(Nanosecond, Some(_)) => {
663                self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
664            }
665            Interval(MonthDayNano) => {
666                self.from_interval_array(array.as_any().downcast_ref().unwrap())
667            }
668            Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
669            Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
670            LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
671            LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
672            List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
673            Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
674            Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
675            t => Err(ArrayError::from_arrow(format!(
676                "unsupported arrow data type: {t:?}",
677            ))),
678        }
679    }
680
681    /// Converts Arrow extension array to RisingWave `ArrayImpl`.
682    fn from_extension_array(
683        &self,
684        type_name: &str,
685        array: &arrow_array::ArrayRef,
686    ) -> Result<ArrayImpl, ArrayError> {
687        match type_name {
688            "arrowudf.decimal" => {
689                let array: &arrow_array::StringArray =
690                    array.as_any().downcast_ref().ok_or_else(|| {
691                        ArrayError::from_arrow(
692                            "expected string array for `arrowudf.decimal`".to_owned(),
693                        )
694                    })?;
695                Ok(ArrayImpl::Decimal(array.try_into()?))
696            }
697            "arrowudf.json" => {
698                let array: &arrow_array::StringArray =
699                    array.as_any().downcast_ref().ok_or_else(|| {
700                        ArrayError::from_arrow(
701                            "expected string array for `arrowudf.json`".to_owned(),
702                        )
703                    })?;
704                Ok(ArrayImpl::Jsonb(array.try_into()?))
705            }
706            _ => Err(ArrayError::from_arrow(format!(
707                "unsupported extension type: {type_name:?}"
708            ))),
709        }
710    }
711
712    fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
713        Ok(ArrayImpl::Bool(array.into()))
714    }
715
716    fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
717        Ok(ArrayImpl::Int16(array.into()))
718    }
719
720    fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
721        Ok(ArrayImpl::Int16(array.into()))
722    }
723
724    fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
725        Ok(ArrayImpl::Int16(array.into()))
726    }
727
728    fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
729        Ok(ArrayImpl::Int32(array.into()))
730    }
731
732    fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
733        Ok(ArrayImpl::Int64(array.into()))
734    }
735
736    fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
737        Ok(ArrayImpl::Int32(array.into()))
738    }
739
740    fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
741        Ok(ArrayImpl::Int64(array.into()))
742    }
743
744    fn from_int256_array(
745        &self,
746        array: &arrow_array::Decimal256Array,
747    ) -> Result<ArrayImpl, ArrayError> {
748        Ok(ArrayImpl::Int256(array.into()))
749    }
750
751    fn from_decimal128_array(
752        &self,
753        array: &arrow_array::Decimal128Array,
754    ) -> Result<ArrayImpl, ArrayError> {
755        Ok(ArrayImpl::Decimal(array.try_into()?))
756    }
757
758    fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
759        Ok(ArrayImpl::Decimal(array.try_into()?))
760    }
761
762    fn from_float16_array(
763        &self,
764        array: &arrow_array::Float16Array,
765    ) -> Result<ArrayImpl, ArrayError> {
766        Ok(ArrayImpl::Float32(array.try_into()?))
767    }
768
769    fn from_float32_array(
770        &self,
771        array: &arrow_array::Float32Array,
772    ) -> Result<ArrayImpl, ArrayError> {
773        Ok(ArrayImpl::Float32(array.into()))
774    }
775
776    fn from_float64_array(
777        &self,
778        array: &arrow_array::Float64Array,
779    ) -> Result<ArrayImpl, ArrayError> {
780        Ok(ArrayImpl::Float64(array.into()))
781    }
782
783    fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
784        Ok(ArrayImpl::Date(array.into()))
785    }
786
787    fn from_time64us_array(
788        &self,
789        array: &arrow_array::Time64MicrosecondArray,
790    ) -> Result<ArrayImpl, ArrayError> {
791        Ok(ArrayImpl::Time(array.into()))
792    }
793
794    fn from_timestampsecond_array(
795        &self,
796        array: &arrow_array::TimestampSecondArray,
797    ) -> Result<ArrayImpl, ArrayError> {
798        Ok(ArrayImpl::Timestamp(array.into()))
799    }
800    fn from_timestampsecond_some_array(
801        &self,
802        array: &arrow_array::TimestampSecondArray,
803    ) -> Result<ArrayImpl, ArrayError> {
804        Ok(ArrayImpl::Timestamptz(array.into()))
805    }
806
807    fn from_timestampms_array(
808        &self,
809        array: &arrow_array::TimestampMillisecondArray,
810    ) -> Result<ArrayImpl, ArrayError> {
811        Ok(ArrayImpl::Timestamp(array.into()))
812    }
813
814    fn from_timestampms_some_array(
815        &self,
816        array: &arrow_array::TimestampMillisecondArray,
817    ) -> Result<ArrayImpl, ArrayError> {
818        Ok(ArrayImpl::Timestamptz(array.into()))
819    }
820
821    fn from_timestampus_array(
822        &self,
823        array: &arrow_array::TimestampMicrosecondArray,
824    ) -> Result<ArrayImpl, ArrayError> {
825        Ok(ArrayImpl::Timestamp(array.into()))
826    }
827
828    fn from_timestampus_some_array(
829        &self,
830        array: &arrow_array::TimestampMicrosecondArray,
831    ) -> Result<ArrayImpl, ArrayError> {
832        Ok(ArrayImpl::Timestamptz(array.into()))
833    }
834
835    fn from_timestampns_array(
836        &self,
837        array: &arrow_array::TimestampNanosecondArray,
838    ) -> Result<ArrayImpl, ArrayError> {
839        Ok(ArrayImpl::Timestamp(array.into()))
840    }
841
842    fn from_timestampns_some_array(
843        &self,
844        array: &arrow_array::TimestampNanosecondArray,
845    ) -> Result<ArrayImpl, ArrayError> {
846        Ok(ArrayImpl::Timestamptz(array.into()))
847    }
848
849    fn from_interval_array(
850        &self,
851        array: &arrow_array::IntervalMonthDayNanoArray,
852    ) -> Result<ArrayImpl, ArrayError> {
853        Ok(ArrayImpl::Interval(array.into()))
854    }
855
856    fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
857        Ok(ArrayImpl::Utf8(array.into()))
858    }
859
860    fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
861        Ok(ArrayImpl::Bytea(array.into()))
862    }
863
864    fn from_large_utf8_array(
865        &self,
866        array: &arrow_array::LargeStringArray,
867    ) -> Result<ArrayImpl, ArrayError> {
868        Ok(ArrayImpl::Utf8(array.into()))
869    }
870
871    fn from_large_binary_array(
872        &self,
873        array: &arrow_array::LargeBinaryArray,
874    ) -> Result<ArrayImpl, ArrayError> {
875        Ok(ArrayImpl::Bytea(array.into()))
876    }
877
878    fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
879        use arrow_array::Array;
880        let arrow_schema::DataType::List(field) = array.data_type() else {
881            panic!("nested field types cannot be determined.");
882        };
883        Ok(ArrayImpl::List(ListArray {
884            value: Box::new(self.from_array(field, array.values())?),
885            bitmap: match array.nulls() {
886                Some(nulls) => nulls.iter().collect(),
887                None => Bitmap::ones(array.len()),
888            },
889            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
890        }))
891    }
892
893    fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
894        use arrow_array::Array;
895        let arrow_schema::DataType::Struct(fields) = array.data_type() else {
896            panic!("nested field types cannot be determined.");
897        };
898        Ok(ArrayImpl::Struct(StructArray::new(
899            self.from_fields(fields)?,
900            array
901                .columns()
902                .iter()
903                .zip_eq_fast(fields)
904                .map(|(array, field)| self.from_array(field, array).map(Arc::new))
905                .try_collect()?,
906            (0..array.len()).map(|i| array.is_valid(i)).collect(),
907        )))
908    }
909
910    fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
911        use arrow_array::Array;
912        let struct_array = self.from_struct_array(array.entries())?;
913        let list_array = ListArray {
914            value: Box::new(struct_array),
915            bitmap: match array.nulls() {
916                Some(nulls) => nulls.iter().collect(),
917                None => Bitmap::ones(array.len()),
918            },
919            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
920        };
921
922        Ok(ArrayImpl::Map(MapArray { inner: list_array }))
923    }
924}
925
926impl From<&Bitmap> for arrow_buffer::NullBuffer {
927    fn from(bitmap: &Bitmap) -> Self {
928        bitmap.iter().collect()
929    }
930}
931
932/// Implement bi-directional `From` between concrete array types.
933macro_rules! converts {
934    ($ArrayType:ty, $ArrowType:ty) => {
935        impl From<&$ArrayType> for $ArrowType {
936            fn from(array: &$ArrayType) -> Self {
937                array.iter().collect()
938            }
939        }
940        impl From<&$ArrowType> for $ArrayType {
941            fn from(array: &$ArrowType) -> Self {
942                array.iter().collect()
943            }
944        }
945        impl From<&[$ArrowType]> for $ArrayType {
946            fn from(arrays: &[$ArrowType]) -> Self {
947                arrays.iter().flat_map(|a| a.iter()).collect()
948            }
949        }
950    };
951    // convert values using FromIntoArrow
952    ($ArrayType:ty, $ArrowType:ty, @map) => {
953        impl From<&$ArrayType> for $ArrowType {
954            fn from(array: &$ArrayType) -> Self {
955                array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
956            }
957        }
958        impl From<&$ArrowType> for $ArrayType {
959            fn from(array: &$ArrowType) -> Self {
960                array
961                    .iter()
962                    .map(|o| {
963                        o.map(|v| {
964                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
965                        })
966                    })
967                    .collect()
968            }
969        }
970        impl From<&[$ArrowType]> for $ArrayType {
971            fn from(arrays: &[$ArrowType]) -> Self {
972                arrays
973                    .iter()
974                    .flat_map(|a| a.iter())
975                    .map(|o| {
976                        o.map(|v| {
977                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
978                        })
979                    })
980                    .collect()
981            }
982        }
983    };
984}
985
986/// Used to convert different types.
987macro_rules! converts_with_type {
988    ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
989        impl From<&$ArrayType> for $ArrowType {
990            fn from(array: &$ArrayType) -> Self {
991                let values: Vec<Option<$ToType>> =
992                    array.iter().map(|x| x.map(|v| v as $ToType)).collect();
993                <$ArrowType>::from_iter(values)
994            }
995        }
996
997        impl From<&$ArrowType> for $ArrayType {
998            fn from(array: &$ArrowType) -> Self {
999                let values: Vec<Option<$FromType>> =
1000                    array.iter().map(|x| x.map(|v| v as $FromType)).collect();
1001                <$ArrayType>::from_iter(values)
1002            }
1003        }
1004
1005        impl From<&[$ArrowType]> for $ArrayType {
1006            fn from(arrays: &[$ArrowType]) -> Self {
1007                let values: Vec<Option<$FromType>> = arrays
1008                    .iter()
1009                    .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1010                    .collect();
1011                <$ArrayType>::from_iter(values)
1012            }
1013        }
1014    };
1015}
1016
1017macro_rules! converts_with_timeunit {
1018    ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1019
1020        impl From<&$ArrayType> for $ArrowType {
1021            fn from(array: &$ArrayType) -> Self {
1022                array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1023            }
1024        }
1025
1026        impl From<&$ArrowType> for $ArrayType {
1027            fn from(array: &$ArrowType) -> Self {
1028                array.iter().map(|o| {
1029                    o.map(|v| {
1030                        let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1031                        timestamp
1032                    })
1033                }).collect()
1034            }
1035        }
1036
1037        impl From<&[$ArrowType]> for $ArrayType {
1038            fn from(arrays: &[$ArrowType]) -> Self {
1039                arrays
1040                    .iter()
1041                    .flat_map(|a| a.iter())
1042                    .map(|o| {
1043                        o.map(|v| {
1044                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1045                        })
1046                    })
1047                    .collect()
1048            }
1049        }
1050
1051    };
1052}
1053
1054converts!(BoolArray, arrow_array::BooleanArray);
1055converts!(I16Array, arrow_array::Int16Array);
1056converts!(I32Array, arrow_array::Int32Array);
1057converts!(I64Array, arrow_array::Int64Array);
1058converts!(F32Array, arrow_array::Float32Array, @map);
1059converts!(F64Array, arrow_array::Float64Array, @map);
1060converts!(BytesArray, arrow_array::BinaryArray);
1061converts!(BytesArray, arrow_array::LargeBinaryArray);
1062converts!(Utf8Array, arrow_array::StringArray);
1063converts!(Utf8Array, arrow_array::LargeStringArray);
1064converts!(DateArray, arrow_array::Date32Array, @map);
1065converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1066converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1067converts!(SerialArray, arrow_array::Int64Array, @map);
1068
1069converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1070converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1071converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1072converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1073
1074converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1075converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1076converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1077converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1078
1079converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1080converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1081converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1082converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1083
1084/// Converts RisingWave value from and into Arrow value.
1085trait FromIntoArrow {
1086    /// The corresponding element type in the Arrow array.
1087    type ArrowType;
1088    fn from_arrow(value: Self::ArrowType) -> Self;
1089    fn into_arrow(self) -> Self::ArrowType;
1090}
1091
1092/// Converts RisingWave value from and into Arrow value.
1093/// Specifically used for converting timestamp types according to timeunit.
1094trait FromIntoArrowWithUnit {
1095    type ArrowType;
1096    /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
1097    type TimestampType;
1098    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1099    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1100}
1101
1102impl FromIntoArrow for Serial {
1103    type ArrowType = i64;
1104
1105    fn from_arrow(value: Self::ArrowType) -> Self {
1106        value.into()
1107    }
1108
1109    fn into_arrow(self) -> Self::ArrowType {
1110        self.into()
1111    }
1112}
1113
1114impl FromIntoArrow for F32 {
1115    type ArrowType = f32;
1116
1117    fn from_arrow(value: Self::ArrowType) -> Self {
1118        value.into()
1119    }
1120
1121    fn into_arrow(self) -> Self::ArrowType {
1122        self.into()
1123    }
1124}
1125
1126impl FromIntoArrow for F64 {
1127    type ArrowType = f64;
1128
1129    fn from_arrow(value: Self::ArrowType) -> Self {
1130        value.into()
1131    }
1132
1133    fn into_arrow(self) -> Self::ArrowType {
1134        self.into()
1135    }
1136}
1137
1138impl FromIntoArrow for Date {
1139    type ArrowType = i32;
1140
1141    fn from_arrow(value: Self::ArrowType) -> Self {
1142        Date(arrow_array::types::Date32Type::to_naive_date(value))
1143    }
1144
1145    fn into_arrow(self) -> Self::ArrowType {
1146        arrow_array::types::Date32Type::from_naive_date(self.0)
1147    }
1148}
1149
1150impl FromIntoArrow for Time {
1151    type ArrowType = i64;
1152
1153    fn from_arrow(value: Self::ArrowType) -> Self {
1154        Time(
1155            NaiveTime::from_num_seconds_from_midnight_opt(
1156                (value / 1_000_000) as _,
1157                (value % 1_000_000 * 1000) as _,
1158            )
1159            .unwrap(),
1160        )
1161    }
1162
1163    fn into_arrow(self) -> Self::ArrowType {
1164        self.0
1165            .signed_duration_since(NaiveTime::default())
1166            .num_microseconds()
1167            .unwrap()
1168    }
1169}
1170
1171impl FromIntoArrowWithUnit for Timestamp {
1172    type ArrowType = i64;
1173    type TimestampType = TimeUnit;
1174
1175    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1176        match time_unit {
1177            TimeUnit::Second => {
1178                Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1179            }
1180            TimeUnit::Millisecond => {
1181                Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1182            }
1183            TimeUnit::Microsecond => {
1184                Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1185            }
1186            TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1187        }
1188    }
1189
1190    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1191        match time_unit {
1192            TimeUnit::Second => self.0.and_utc().timestamp(),
1193            TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1194            TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1195            TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1196        }
1197    }
1198}
1199
1200impl FromIntoArrowWithUnit for Timestamptz {
1201    type ArrowType = i64;
1202    type TimestampType = TimeUnit;
1203
1204    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1205        match time_unit {
1206            TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1207            TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1208            TimeUnit::Microsecond => Timestamptz::from_micros(value),
1209            TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1210        }
1211    }
1212
1213    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1214        match time_unit {
1215            TimeUnit::Second => self.timestamp(),
1216            TimeUnit::Millisecond => self.timestamp_millis(),
1217            TimeUnit::Microsecond => self.timestamp_micros(),
1218            TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1219        }
1220    }
1221}
1222
1223impl FromIntoArrow for Interval {
1224    type ArrowType = ArrowIntervalType;
1225
1226    fn from_arrow(value: Self::ArrowType) -> Self {
1227        Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1228    }
1229
1230    fn into_arrow(self) -> Self::ArrowType {
1231        ArrowIntervalType {
1232            months: self.months(),
1233            days: self.days(),
1234            // TODO: this may overflow and we need `try_into`
1235            nanoseconds: self.usecs() * 1000,
1236        }
1237    }
1238}
1239
1240impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1241    fn from(array: &DecimalArray) -> Self {
1242        let mut builder =
1243            arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1244        for value in array.iter() {
1245            builder.append_option(value.map(|d| d.to_string()));
1246        }
1247        builder.finish()
1248    }
1249}
1250
1251impl From<&DecimalArray> for arrow_array::StringArray {
1252    fn from(array: &DecimalArray) -> Self {
1253        let mut builder =
1254            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1255        for value in array.iter() {
1256            builder.append_option(value.map(|d| d.to_string()));
1257        }
1258        builder.finish()
1259    }
1260}
1261
1262// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
1263impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1264    type Error = ArrayError;
1265
1266    fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1267        if array.scale() < 0 {
1268            bail!("support negative scale for arrow decimal")
1269        }
1270
1271        // Calculate the max value based on the Arrow decimal's precision
1272        // When writing Inf to Arrow Decimal128(precision, scale), we use 10^precision - 1
1273        let precision = array.precision();
1274        let max_value = 10_i128.pow(precision as u32) - 1;
1275
1276        let from_arrow = |value| {
1277            const NAN: i128 = i128::MIN + 1;
1278            let res = match value {
1279                // Check for special values using Arrow Decimal's max value, not i128::MAX
1280                NAN => Decimal::NaN,
1281                v if v == max_value => Decimal::PositiveInf,
1282                v if v == -max_value => Decimal::NegativeInf,
1283                i128::MAX => Decimal::PositiveInf, // Fallback for old data
1284                i128::MIN => Decimal::NegativeInf, // Fallback for old data
1285                _ => Decimal::truncated_i128_and_scale(value, array.scale() as u32)
1286                    .ok_or_else(|| ArrayError::from_arrow("decimal overflow"))?,
1287            };
1288            Ok(res)
1289        };
1290        array
1291            .iter()
1292            .map(|o| o.map(from_arrow).transpose())
1293            .collect::<Result<Self, Self::Error>>()
1294    }
1295}
1296
1297// Since RisingWave does not support UInt type, convert UInt64Array to Decimal.
1298impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1299    type Error = ArrayError;
1300
1301    fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1302        let from_arrow = |value| {
1303            // Convert the value to a Decimal with scale 0
1304            let res = Decimal::from(value);
1305            Ok(res)
1306        };
1307
1308        // Map over the array and convert each value
1309        array
1310            .iter()
1311            .map(|o| o.map(from_arrow).transpose())
1312            .collect::<Result<Self, Self::Error>>()
1313    }
1314}
1315
1316impl TryFrom<&arrow_array::Float16Array> for F32Array {
1317    type Error = ArrayError;
1318
1319    fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1320        let from_arrow = |value| Ok(f32::from(value));
1321
1322        array
1323            .iter()
1324            .map(|o| o.map(from_arrow).transpose())
1325            .collect::<Result<Self, Self::Error>>()
1326    }
1327}
1328
1329impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1330    type Error = ArrayError;
1331
1332    fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1333        array
1334            .iter()
1335            .map(|o| {
1336                o.map(|s| {
1337                    let s = std::str::from_utf8(s)
1338                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1339                    s.parse()
1340                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1341                })
1342                .transpose()
1343            })
1344            .try_collect()
1345    }
1346}
1347
1348impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1349    type Error = ArrayError;
1350
1351    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1352        array
1353            .iter()
1354            .map(|o| {
1355                o.map(|s| {
1356                    s.parse()
1357                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1358                })
1359                .transpose()
1360            })
1361            .try_collect()
1362    }
1363}
1364
1365impl From<&JsonbArray> for arrow_array::StringArray {
1366    fn from(array: &JsonbArray) -> Self {
1367        let mut builder =
1368            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1369        for value in array.iter() {
1370            match value {
1371                Some(jsonb) => {
1372                    write!(&mut builder, "{}", jsonb).unwrap();
1373                    builder.append_value("");
1374                }
1375                None => builder.append_null(),
1376            }
1377        }
1378        builder.finish()
1379    }
1380}
1381
1382impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1383    type Error = ArrayError;
1384
1385    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1386        array
1387            .iter()
1388            .map(|o| {
1389                o.map(|s| {
1390                    s.parse()
1391                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1392                })
1393                .transpose()
1394            })
1395            .try_collect()
1396    }
1397}
1398
1399impl From<&IntervalArray> for arrow_array::StringArray {
1400    fn from(array: &IntervalArray) -> Self {
1401        let mut builder =
1402            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1403        for value in array.iter() {
1404            match value {
1405                Some(interval) => {
1406                    write!(&mut builder, "{}", interval).unwrap();
1407                    builder.append_value("");
1408                }
1409                None => builder.append_null(),
1410            }
1411        }
1412        builder.finish()
1413    }
1414}
1415
1416impl From<&JsonbArray> for arrow_array::LargeStringArray {
1417    fn from(array: &JsonbArray) -> Self {
1418        let mut builder =
1419            arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1420        for value in array.iter() {
1421            match value {
1422                Some(jsonb) => {
1423                    write!(&mut builder, "{}", jsonb).unwrap();
1424                    builder.append_value("");
1425                }
1426                None => builder.append_null(),
1427            }
1428        }
1429        builder.finish()
1430    }
1431}
1432
1433impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1434    type Error = ArrayError;
1435
1436    fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1437        array
1438            .iter()
1439            .map(|o| {
1440                o.map(|s| {
1441                    s.parse()
1442                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1443                })
1444                .transpose()
1445            })
1446            .try_collect()
1447    }
1448}
1449
1450impl From<arrow_buffer::i256> for Int256 {
1451    fn from(value: arrow_buffer::i256) -> Self {
1452        let buffer = value.to_be_bytes();
1453        Int256::from_be_bytes(buffer)
1454    }
1455}
1456
1457impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1458    fn from(val: Int256Ref<'a>) -> Self {
1459        let buffer = val.to_be_bytes();
1460        arrow_buffer::i256::from_be_bytes(buffer)
1461    }
1462}
1463
1464impl From<&Int256Array> for arrow_array::Decimal256Array {
1465    fn from(array: &Int256Array) -> Self {
1466        array
1467            .iter()
1468            .map(|o| o.map(arrow_buffer::i256::from))
1469            .collect()
1470    }
1471}
1472
1473impl From<&arrow_array::Decimal256Array> for Int256Array {
1474    fn from(array: &arrow_array::Decimal256Array) -> Self {
1475        let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1476
1477        values
1478            .iter()
1479            .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1480            .collect()
1481    }
1482}
1483
1484/// This function checks whether the schema of a Parquet file matches the user-defined schema in RisingWave.
1485/// It handles the following special cases:
1486/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `Timestamp` type.
1487/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `Timestamptz` type.
1488/// - Since RisingWave does not have an `UInt` type:
1489///   - Arrow's `UInt8` matches with RisingWave's `Int16`.
1490///   - Arrow's `UInt16` matches with RisingWave's `Int32`.
1491///   - Arrow's `UInt32` matches with RisingWave's `Int64`.
1492///   - Arrow's `UInt64` matches with RisingWave's `Decimal`.
1493/// - Arrow's `Float16` matches with RisingWave's `Float32`.
1494///
1495/// Nested data type matching:
1496/// - Struct: Arrow's `Struct` type matches with RisingWave's `Struct` type recursively, requiring the same field names and types.
1497/// - List: Arrow's `List` type matches with RisingWave's `List` type recursively, requiring the same element type.
1498/// - Map: Arrow's `Map` type matches with RisingWave's `Map` type recursively, requiring the key and value types to match, and the inner struct must have exactly two fields named "key" and "value".
1499pub fn is_parquet_schema_match_source_schema(
1500    arrow_data_type: &arrow_schema::DataType,
1501    rw_data_type: &crate::types::DataType,
1502) -> bool {
1503    use arrow_schema::DataType as ArrowType;
1504
1505    use crate::types::{DataType as RwType, MapType, StructType};
1506
1507    match (arrow_data_type, rw_data_type) {
1508        // Primitive type matching and special cases
1509        (ArrowType::Boolean, RwType::Boolean)
1510        | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1511        | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1512        | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1513        | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1514        | (ArrowType::Decimal256(_, _), RwType::Int256)
1515        | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1516        | (ArrowType::Float64, RwType::Float64)
1517        | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1518        | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1519        | (ArrowType::Date32, RwType::Date)
1520        | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1521        | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1522        | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1523        | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1524
1525        // Struct type recursive matching
1526        // Arrow's Struct matches RisingWave's Struct if all field names and types match recursively
1527        (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1528            if arrow_fields.len() != rw_struct.len() {
1529                return false;
1530            }
1531            for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1532            {
1533                if arrow_field.name() != rw_name {
1534                    return false;
1535                }
1536                if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1537                    return false;
1538                }
1539            }
1540            true
1541        }
1542        // List type recursive matching
1543        // Arrow's List matches RisingWave's List if the element type matches recursively
1544        (ArrowType::List(arrow_field), RwType::List(rw_list_ty)) => {
1545            is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_list_ty.elem())
1546        }
1547        // Map type recursive matching
1548        // Arrow's Map matches RisingWave's Map if the key and value types match recursively,
1549        // and the inner struct has exactly two fields named "key" and "value"
1550        (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1551            if let ArrowType::Struct(fields) = arrow_field.data_type() {
1552                if fields.len() != 2 {
1553                    return false;
1554                }
1555                let key_field = &fields[0];
1556                let value_field = &fields[1];
1557                if key_field.name() != "key" || value_field.name() != "value" {
1558                    return false;
1559                }
1560                let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1561                is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1562                    && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1563            } else {
1564                false
1565            }
1566        }
1567        // Fallback: types do not match
1568        _ => false,
1569    }
1570}
1571#[cfg(test)]
1572mod tests {
1573
1574    use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1575
1576    use super::*;
1577    use crate::types::{DataType as RwType, MapType, StructType};
1578
1579    #[test]
1580    fn test_struct_schema_match() {
1581        // Arrow: struct<f1: Double, f2: Utf8>
1582
1583        let arrow_struct = ArrowType::Struct(
1584            vec![
1585                ArrowField::new("f1", ArrowType::Float64, true),
1586                ArrowField::new("f2", ArrowType::Utf8, true),
1587            ]
1588            .into(),
1589        );
1590        // RW: struct<f1 Double, f2 Varchar>
1591        let rw_struct = RwType::Struct(StructType::new(vec![
1592            ("f1".to_owned(), RwType::Float64),
1593            ("f2".to_owned(), RwType::Varchar),
1594        ]));
1595        assert!(is_parquet_schema_match_source_schema(
1596            &arrow_struct,
1597            &rw_struct
1598        ));
1599
1600        // Field names do not match
1601        let arrow_struct2 = ArrowType::Struct(
1602            vec![
1603                ArrowField::new("f1", ArrowType::Float64, true),
1604                ArrowField::new("f3", ArrowType::Utf8, true),
1605            ]
1606            .into(),
1607        );
1608        assert!(!is_parquet_schema_match_source_schema(
1609            &arrow_struct2,
1610            &rw_struct
1611        ));
1612    }
1613
1614    #[test]
1615    fn test_list_schema_match() {
1616        // Arrow: list<double>
1617        let arrow_list =
1618            ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1619        // RW: list<double>
1620        let rw_list = RwType::Float64.list();
1621        assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1622
1623        let rw_list2 = RwType::Int32.list();
1624        assert!(!is_parquet_schema_match_source_schema(
1625            &arrow_list,
1626            &rw_list2
1627        ));
1628    }
1629
1630    #[test]
1631    fn test_map_schema_match() {
1632        // Arrow: map<utf8, int32>
1633        let arrow_map = ArrowType::Map(
1634            Arc::new(ArrowField::new(
1635                "entries",
1636                ArrowType::Struct(
1637                    vec![
1638                        ArrowField::new("key", ArrowType::Utf8, false),
1639                        ArrowField::new("value", ArrowType::Int32, true),
1640                    ]
1641                    .into(),
1642                ),
1643                false,
1644            )),
1645            false,
1646        );
1647        // RW: map<varchar, int32>
1648        let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1649        assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1650
1651        // Key type does not match
1652        let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1653        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1654
1655        // Value type does not match
1656        let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1657        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1658
1659        // Arrow inner struct field name does not match
1660        let arrow_map2 = ArrowType::Map(
1661            Arc::new(ArrowField::new(
1662                "entries",
1663                ArrowType::Struct(
1664                    vec![
1665                        ArrowField::new("k", ArrowType::Utf8, false),
1666                        ArrowField::new("value", ArrowType::Int32, true),
1667                    ]
1668                    .into(),
1669                ),
1670                false,
1671            )),
1672            false,
1673        );
1674        assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1675    }
1676
1677    #[test]
1678    fn bool() {
1679        let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1680        let arrow = arrow_array::BooleanArray::from(&array);
1681        assert_eq!(BoolArray::from(&arrow), array);
1682    }
1683
1684    #[test]
1685    fn i16() {
1686        let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1687        let arrow = arrow_array::Int16Array::from(&array);
1688        assert_eq!(I16Array::from(&arrow), array);
1689    }
1690
1691    #[test]
1692    fn i32() {
1693        let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1694        let arrow = arrow_array::Int32Array::from(&array);
1695        assert_eq!(I32Array::from(&arrow), array);
1696    }
1697
1698    #[test]
1699    fn i64() {
1700        let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1701        let arrow = arrow_array::Int64Array::from(&array);
1702        assert_eq!(I64Array::from(&arrow), array);
1703    }
1704
1705    #[test]
1706    fn f32() {
1707        let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1708        let arrow = arrow_array::Float32Array::from(&array);
1709        assert_eq!(F32Array::from(&arrow), array);
1710    }
1711
1712    #[test]
1713    fn f64() {
1714        let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1715        let arrow = arrow_array::Float64Array::from(&array);
1716        assert_eq!(F64Array::from(&arrow), array);
1717    }
1718
1719    #[test]
1720    fn int8() {
1721        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1722        let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1723        let converted: PrimitiveArray<i16> = (&arr).into();
1724        assert_eq!(converted, array);
1725    }
1726
1727    #[test]
1728    fn uint8() {
1729        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1730        let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1731        let converted: PrimitiveArray<i16> = (&arr).into();
1732        assert_eq!(converted, array);
1733    }
1734
1735    #[test]
1736    fn uint16() {
1737        let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1738        let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1739        let converted: PrimitiveArray<i32> = (&arr).into();
1740        assert_eq!(converted, array);
1741    }
1742
1743    #[test]
1744    fn uint32() {
1745        let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1746        let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1747        let converted: PrimitiveArray<i64> = (&arr).into();
1748        assert_eq!(converted, array);
1749    }
1750
1751    #[test]
1752    fn uint64() {
1753        let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1754            None,
1755            Some(Decimal::Normalized("7".parse().unwrap())),
1756            Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1757        ]);
1758        let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1759        let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1760        assert_eq!(converted, array);
1761    }
1762
1763    #[test]
1764    fn date() {
1765        let array = DateArray::from_iter([
1766            None,
1767            Date::with_days_since_ce(12345).ok(),
1768            Date::with_days_since_ce(-12345).ok(),
1769        ]);
1770        let arrow = arrow_array::Date32Array::from(&array);
1771        assert_eq!(DateArray::from(&arrow), array);
1772    }
1773
1774    #[test]
1775    fn time() {
1776        let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1777        let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1778        assert_eq!(TimeArray::from(&arrow), array);
1779    }
1780
1781    #[test]
1782    fn timestamp() {
1783        let array =
1784            TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1785        let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1786        assert_eq!(TimestampArray::from(&arrow), array);
1787    }
1788
1789    #[test]
1790    fn interval() {
1791        let array = IntervalArray::from_iter([
1792            None,
1793            Some(Interval::from_month_day_usec(
1794                1_000_000,
1795                1_000,
1796                1_000_000_000,
1797            )),
1798            Some(Interval::from_month_day_usec(
1799                -1_000_000,
1800                -1_000,
1801                -1_000_000_000,
1802            )),
1803        ]);
1804        let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1805        assert_eq!(IntervalArray::from(&arrow), array);
1806    }
1807
1808    #[test]
1809    fn string() {
1810        let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1811        let arrow = arrow_array::StringArray::from(&array);
1812        assert_eq!(Utf8Array::from(&arrow), array);
1813    }
1814
1815    #[test]
1816    fn binary() {
1817        let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1818        let arrow = arrow_array::BinaryArray::from(&array);
1819        assert_eq!(BytesArray::from(&arrow), array);
1820    }
1821
1822    #[test]
1823    fn decimal() {
1824        let array = DecimalArray::from_iter([
1825            None,
1826            Some(Decimal::NaN),
1827            Some(Decimal::PositiveInf),
1828            Some(Decimal::NegativeInf),
1829            Some(Decimal::Normalized("123.4".parse().unwrap())),
1830            Some(Decimal::Normalized("123.456".parse().unwrap())),
1831        ]);
1832        let arrow = arrow_array::LargeBinaryArray::from(&array);
1833        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1834
1835        let arrow = arrow_array::StringArray::from(&array);
1836        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1837    }
1838
1839    #[test]
1840    fn jsonb() {
1841        let array = JsonbArray::from_iter([
1842            None,
1843            Some("null".parse().unwrap()),
1844            Some("false".parse().unwrap()),
1845            Some("1".parse().unwrap()),
1846            Some("[1, 2, 3]".parse().unwrap()),
1847            Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1848        ]);
1849        let arrow = arrow_array::LargeStringArray::from(&array);
1850        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1851
1852        let arrow = arrow_array::StringArray::from(&array);
1853        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1854    }
1855
1856    #[test]
1857    fn int256() {
1858        let values = [
1859            None,
1860            Some(Int256::from(1)),
1861            Some(Int256::from(i64::MAX)),
1862            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1863            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1864            Some(
1865                Int256::from(i64::MAX)
1866                    * Int256::from(i64::MAX)
1867                    * Int256::from(i64::MAX)
1868                    * Int256::from(i64::MAX),
1869            ),
1870            Some(Int256::min_value()),
1871            Some(Int256::max_value()),
1872        ];
1873
1874        let array =
1875            Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1876        let arrow = arrow_array::Decimal256Array::from(&array);
1877        assert_eq!(Int256Array::from(&arrow), array);
1878    }
1879}