risingwave_common/array/arrow/
arrow_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Converts between arrays and Apache Arrow arrays.
16//!
17//! This file acts as a template file for conversion code between
18//! arrays and different version of Apache Arrow.
19//!
20//! The conversion logic will be implemented for the arrow version specified in the outer mod by
21//! `super::arrow_xxx`, such as `super::arrow_array`.
22//!
23//! When we want to implement the conversion logic for an arrow version, we first
24//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
25//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
26//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
27//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
28//! version.
29//!
30//! Example can be seen in `arrow_default.rs`, which is also as followed:
31//! ```ignore
32//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
33//!
34//! #[allow(clippy::duplicate_mod)]
35//! #[path = "./arrow_impl.rs"]
36//! mod arrow_impl;
37//! ```
38
39// Is this a bug? Why do we have these lints?
40#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53// This is important because we want to use the arrow version specified by the outer mod.
54use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55// Other import should always use the absolute path.
56use crate::array::*;
57use crate::types::{DataType as RwDataType, *};
58use crate::util::iter_util::ZipEqFast;
59
60/// Defines how to convert RisingWave arrays to Arrow arrays.
61///
62/// This trait allows for customized conversion logic for different external systems using Arrow.
63/// The default implementation is based on the `From` implemented in this mod.
64pub trait ToArrow {
65    /// Converts RisingWave `DataChunk` to Arrow `RecordBatch` with specified schema.
66    ///
67    /// This function will try to convert the array if the type is not same with the schema.
68    fn to_record_batch(
69        &self,
70        schema: arrow_schema::SchemaRef,
71        chunk: &DataChunk,
72    ) -> Result<arrow_array::RecordBatch, ArrayError> {
73        // compact the chunk if it's not compacted
74        if !chunk.is_compacted() {
75            let c = chunk.clone();
76            return self.to_record_batch(schema, &c.compact());
77        }
78
79        // convert each column to arrow array
80        let columns: Vec<_> = chunk
81            .columns()
82            .iter()
83            .zip_eq_fast(schema.fields().iter())
84            .map(|(column, field)| self.to_array(field.data_type(), column))
85            .try_collect()?;
86
87        // create record batch
88        let opts =
89            arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90        arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91            .map_err(ArrayError::to_arrow)
92    }
93
94    /// Converts RisingWave array to Arrow array.
95    fn to_array(
96        &self,
97        data_type: &arrow_schema::DataType,
98        array: &ArrayImpl,
99    ) -> Result<arrow_array::ArrayRef, ArrayError> {
100        let arrow_array = match array {
101            ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102            ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103            ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104            ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105            ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106            ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107            ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108            ArrayImpl::Date(array) => self.date_to_arrow(array),
109            ArrayImpl::Time(array) => self.time_to_arrow(array),
110            ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111            ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112            ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113            ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114            ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115            ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116            ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117            ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118            ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119            ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121            ArrayImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
122        }?;
123        if arrow_array.data_type() != data_type {
124            arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125        } else {
126            Ok(arrow_array)
127        }
128    }
129
130    #[inline]
131    fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132        Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133    }
134
135    #[inline]
136    fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137        Ok(Arc::new(arrow_array::Int16Array::from(array)))
138    }
139
140    #[inline]
141    fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142        Ok(Arc::new(arrow_array::Int32Array::from(array)))
143    }
144
145    #[inline]
146    fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147        Ok(Arc::new(arrow_array::Int64Array::from(array)))
148    }
149
150    #[inline]
151    fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152        Ok(Arc::new(arrow_array::Float32Array::from(array)))
153    }
154
155    #[inline]
156    fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157        Ok(Arc::new(arrow_array::Float64Array::from(array)))
158    }
159
160    #[inline]
161    fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162        Ok(Arc::new(arrow_array::StringArray::from(array)))
163    }
164
165    #[inline]
166    fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167        Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168    }
169
170    #[inline]
171    fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172        Ok(Arc::new(arrow_array::Date32Array::from(array)))
173    }
174
175    #[inline]
176    fn timestamp_to_arrow(
177        &self,
178        array: &TimestampArray,
179    ) -> Result<arrow_array::ArrayRef, ArrayError> {
180        Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181            array,
182        )))
183    }
184
185    #[inline]
186    fn timestamptz_to_arrow(
187        &self,
188        array: &TimestamptzArray,
189    ) -> Result<arrow_array::ArrayRef, ArrayError> {
190        Ok(Arc::new(
191            arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192        ))
193    }
194
195    #[inline]
196    fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197        Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198    }
199
200    #[inline]
201    fn interval_to_arrow(
202        &self,
203        array: &IntervalArray,
204    ) -> Result<arrow_array::ArrayRef, ArrayError> {
205        Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206            array,
207        )))
208    }
209
210    #[inline]
211    fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212        Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213    }
214
215    // Decimal values are stored as ASCII text representation in a string array.
216    #[inline]
217    fn decimal_to_arrow(
218        &self,
219        _data_type: &arrow_schema::DataType,
220        array: &DecimalArray,
221    ) -> Result<arrow_array::ArrayRef, ArrayError> {
222        Ok(Arc::new(arrow_array::StringArray::from(array)))
223    }
224
225    // JSON values are stored as text representation in a string array.
226    #[inline]
227    fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228        Ok(Arc::new(arrow_array::StringArray::from(array)))
229    }
230
231    #[inline]
232    fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233        Ok(Arc::new(arrow_array::Int64Array::from(array)))
234    }
235
236    #[inline]
237    fn list_to_arrow(
238        &self,
239        data_type: &arrow_schema::DataType,
240        array: &ListArray,
241    ) -> Result<arrow_array::ArrayRef, ArrayError> {
242        let arrow_schema::DataType::List(field) = data_type else {
243            return Err(ArrayError::to_arrow("Invalid list type"));
244        };
245        let values = self.to_array(field.data_type(), array.values())?;
246        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248        Ok(Arc::new(arrow_array::ListArray::new(
249            field.clone(),
250            offsets,
251            values,
252            nulls,
253        )))
254    }
255
256    #[inline]
257    fn struct_to_arrow(
258        &self,
259        data_type: &arrow_schema::DataType,
260        array: &StructArray,
261    ) -> Result<arrow_array::ArrayRef, ArrayError> {
262        let arrow_schema::DataType::Struct(fields) = data_type else {
263            return Err(ArrayError::to_arrow("Invalid struct type"));
264        };
265        Ok(Arc::new(arrow_array::StructArray::new(
266            fields.clone(),
267            array
268                .fields()
269                .zip_eq_fast(fields)
270                .map(|(arr, field)| self.to_array(field.data_type(), arr))
271                .try_collect::<_, _, ArrayError>()?,
272            Some(array.null_bitmap().into()),
273        )))
274    }
275
276    #[inline]
277    fn map_to_arrow(
278        &self,
279        data_type: &arrow_schema::DataType,
280        array: &MapArray,
281    ) -> Result<arrow_array::ArrayRef, ArrayError> {
282        let arrow_schema::DataType::Map(field, ordered) = data_type else {
283            return Err(ArrayError::to_arrow("Invalid map type"));
284        };
285        if *ordered {
286            return Err(ArrayError::to_arrow("Sorted map is not supported"));
287        }
288        let values = self
289            .struct_to_arrow(field.data_type(), array.as_struct())?
290            .as_struct()
291            .clone();
292        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
293        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
294        Ok(Arc::new(arrow_array::MapArray::new(
295            field.clone(),
296            offsets,
297            values,
298            nulls,
299            *ordered,
300        )))
301    }
302
303    /// Convert RisingWave data type to Arrow data type.
304    ///
305    /// This function returns a `Field` instead of `DataType` because some may be converted to
306    /// extension types which require additional metadata in the field.
307    fn to_arrow_field(
308        &self,
309        name: &str,
310        value: &DataType,
311    ) -> Result<arrow_schema::Field, ArrayError> {
312        let data_type = match value {
313            // using the inline function
314            DataType::Boolean => self.bool_type_to_arrow(),
315            DataType::Int16 => self.int16_type_to_arrow(),
316            DataType::Int32 => self.int32_type_to_arrow(),
317            DataType::Int64 => self.int64_type_to_arrow(),
318            DataType::Int256 => self.int256_type_to_arrow(),
319            DataType::Float32 => self.float32_type_to_arrow(),
320            DataType::Float64 => self.float64_type_to_arrow(),
321            DataType::Date => self.date_type_to_arrow(),
322            DataType::Time => self.time_type_to_arrow(),
323            DataType::Timestamp => self.timestamp_type_to_arrow(),
324            DataType::Timestamptz => self.timestamptz_type_to_arrow(),
325            DataType::Interval => self.interval_type_to_arrow(),
326            DataType::Varchar => self.varchar_type_to_arrow(),
327            DataType::Bytea => self.bytea_type_to_arrow(),
328            DataType::Serial => self.serial_type_to_arrow(),
329            DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
330            DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
331            DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
332            DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
333            DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
334            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
335        };
336        Ok(arrow_schema::Field::new(name, data_type, true))
337    }
338
339    #[inline]
340    fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
341        arrow_schema::DataType::Boolean
342    }
343
344    #[inline]
345    fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
346        arrow_schema::DataType::Int16
347    }
348
349    #[inline]
350    fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
351        arrow_schema::DataType::Int32
352    }
353
354    #[inline]
355    fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
356        arrow_schema::DataType::Int64
357    }
358
359    #[inline]
360    fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
361        arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
362    }
363
364    #[inline]
365    fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
366        arrow_schema::DataType::Float32
367    }
368
369    #[inline]
370    fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
371        arrow_schema::DataType::Float64
372    }
373
374    #[inline]
375    fn date_type_to_arrow(&self) -> arrow_schema::DataType {
376        arrow_schema::DataType::Date32
377    }
378
379    #[inline]
380    fn time_type_to_arrow(&self) -> arrow_schema::DataType {
381        arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
382    }
383
384    #[inline]
385    fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
386        arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
387    }
388
389    #[inline]
390    fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
391        arrow_schema::DataType::Timestamp(
392            arrow_schema::TimeUnit::Microsecond,
393            Some("+00:00".into()),
394        )
395    }
396
397    #[inline]
398    fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
399        arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
400    }
401
402    #[inline]
403    fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
404        arrow_schema::DataType::Utf8
405    }
406
407    #[inline]
408    fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
409        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
410            .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
411    }
412
413    #[inline]
414    fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
415        arrow_schema::DataType::Binary
416    }
417
418    #[inline]
419    fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
420        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
421            .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
422    }
423
424    #[inline]
425    fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
426        arrow_schema::DataType::Int64
427    }
428
429    #[inline]
430    fn list_type_to_arrow(
431        &self,
432        elem_type: &DataType,
433    ) -> Result<arrow_schema::DataType, ArrayError> {
434        Ok(arrow_schema::DataType::List(Arc::new(
435            self.to_arrow_field("item", elem_type)?,
436        )))
437    }
438
439    #[inline]
440    fn struct_type_to_arrow(
441        &self,
442        fields: &StructType,
443    ) -> Result<arrow_schema::DataType, ArrayError> {
444        Ok(arrow_schema::DataType::Struct(
445            fields
446                .iter()
447                .map(|(name, ty)| self.to_arrow_field(name, ty))
448                .try_collect::<_, _, ArrayError>()?,
449        ))
450    }
451
452    #[inline]
453    fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
454        let sorted = false;
455        // "key" is always non-null
456        let key = self
457            .to_arrow_field("key", map_type.key())?
458            .with_nullable(false);
459        let value = self.to_arrow_field("value", map_type.value())?;
460        Ok(arrow_schema::DataType::Map(
461            Arc::new(arrow_schema::Field::new(
462                "entries",
463                arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
464                // "entries" is always non-null
465                false,
466            )),
467            sorted,
468        ))
469    }
470}
471
472/// Defines how to convert Arrow arrays to RisingWave arrays.
473#[allow(clippy::wrong_self_convention)]
474pub trait FromArrow {
475    /// Converts Arrow `RecordBatch` to RisingWave `DataChunk`.
476    fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
477        let mut columns = Vec::with_capacity(batch.num_columns());
478        for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
479            let column = Arc::new(self.from_array(field, array)?);
480            columns.push(column);
481        }
482        Ok(DataChunk::new(columns, batch.num_rows()))
483    }
484
485    /// Converts Arrow `Fields` to RisingWave `StructType`.
486    fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
487        Ok(StructType::new(
488            fields
489                .iter()
490                .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
491                .try_collect::<_, Vec<_>, ArrayError>()?,
492        ))
493    }
494
495    /// Converts Arrow `Field` to RisingWave `DataType`.
496    fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
497        use arrow_schema::DataType::*;
498        use arrow_schema::IntervalUnit::*;
499        use arrow_schema::TimeUnit::*;
500
501        // extension type
502        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
503            return self.from_extension_type(type_name, field.data_type());
504        }
505
506        Ok(match field.data_type() {
507            Boolean => DataType::Boolean,
508            Int16 => DataType::Int16,
509            Int32 => DataType::Int32,
510            Int64 => DataType::Int64,
511            Int8 => DataType::Int16,
512            UInt8 => DataType::Int16,
513            UInt16 => DataType::Int32,
514            UInt32 => DataType::Int64,
515            UInt64 => DataType::Decimal,
516            Float16 => DataType::Float32,
517            Float32 => DataType::Float32,
518            Float64 => DataType::Float64,
519            Decimal128(_, _) => DataType::Decimal,
520            Decimal256(_, _) => DataType::Int256,
521            Date32 => DataType::Date,
522            Time64(Microsecond) => DataType::Time,
523            Timestamp(Microsecond, None) => DataType::Timestamp,
524            Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
525            Timestamp(Second, None) => DataType::Timestamp,
526            Timestamp(Second, Some(_)) => DataType::Timestamptz,
527            Timestamp(Millisecond, None) => DataType::Timestamp,
528            Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
529            Timestamp(Nanosecond, None) => DataType::Timestamp,
530            Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
531            Interval(MonthDayNano) => DataType::Interval,
532            Utf8 => DataType::Varchar,
533            Binary => DataType::Bytea,
534            LargeUtf8 => self.from_large_utf8()?,
535            LargeBinary => self.from_large_binary()?,
536            List(field) => DataType::List(Box::new(self.from_field(field)?)),
537            Struct(fields) => DataType::Struct(self.from_fields(fields)?),
538            Map(field, _is_sorted) => {
539                let entries = self.from_field(field)?;
540                DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
541                    ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
542                })?)
543            }
544            t => {
545                return Err(ArrayError::from_arrow(format!(
546                    "unsupported arrow data type: {t:?}"
547                )));
548            }
549        })
550    }
551
552    /// Converts Arrow `LargeUtf8` type to RisingWave data type.
553    fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
554        Ok(DataType::Varchar)
555    }
556
557    /// Converts Arrow `LargeBinary` type to RisingWave data type.
558    fn from_large_binary(&self) -> Result<DataType, ArrayError> {
559        Ok(DataType::Bytea)
560    }
561
562    /// Converts Arrow extension type to RisingWave `DataType`.
563    fn from_extension_type(
564        &self,
565        type_name: &str,
566        physical_type: &arrow_schema::DataType,
567    ) -> Result<DataType, ArrayError> {
568        match (type_name, physical_type) {
569            ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
570            ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
571            _ => Err(ArrayError::from_arrow(format!(
572                "unsupported extension type: {type_name:?}"
573            ))),
574        }
575    }
576
577    /// Converts Arrow `Array` to RisingWave `ArrayImpl`.
578    fn from_array(
579        &self,
580        field: &arrow_schema::Field,
581        array: &arrow_array::ArrayRef,
582    ) -> Result<ArrayImpl, ArrayError> {
583        use arrow_schema::DataType::*;
584        use arrow_schema::IntervalUnit::*;
585        use arrow_schema::TimeUnit::*;
586
587        // extension type
588        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
589            return self.from_extension_array(type_name, array);
590        }
591        match array.data_type() {
592            Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
593            Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
594            Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
595            Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
596            Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
597            UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
598            UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
599            UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
600
601            UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
602            Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
603            Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
604            Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
605            Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
606            Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
607            Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
608            Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
609            Timestamp(Second, None) => {
610                self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
611            }
612            Timestamp(Second, Some(_)) => {
613                self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
614            }
615            Timestamp(Millisecond, None) => {
616                self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
617            }
618            Timestamp(Millisecond, Some(_)) => {
619                self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
620            }
621            Timestamp(Microsecond, None) => {
622                self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
623            }
624            Timestamp(Microsecond, Some(_)) => {
625                self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
626            }
627            Timestamp(Nanosecond, None) => {
628                self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
629            }
630            Timestamp(Nanosecond, Some(_)) => {
631                self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
632            }
633            Interval(MonthDayNano) => {
634                self.from_interval_array(array.as_any().downcast_ref().unwrap())
635            }
636            Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
637            Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
638            LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
639            LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
640            List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
641            Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
642            Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
643            t => Err(ArrayError::from_arrow(format!(
644                "unsupported arrow data type: {t:?}",
645            ))),
646        }
647    }
648
649    /// Converts Arrow extension array to RisingWave `ArrayImpl`.
650    fn from_extension_array(
651        &self,
652        type_name: &str,
653        array: &arrow_array::ArrayRef,
654    ) -> Result<ArrayImpl, ArrayError> {
655        match type_name {
656            "arrowudf.decimal" => {
657                let array: &arrow_array::StringArray =
658                    array.as_any().downcast_ref().ok_or_else(|| {
659                        ArrayError::from_arrow(
660                            "expected string array for `arrowudf.decimal`".to_owned(),
661                        )
662                    })?;
663                Ok(ArrayImpl::Decimal(array.try_into()?))
664            }
665            "arrowudf.json" => {
666                let array: &arrow_array::StringArray =
667                    array.as_any().downcast_ref().ok_or_else(|| {
668                        ArrayError::from_arrow(
669                            "expected string array for `arrowudf.json`".to_owned(),
670                        )
671                    })?;
672                Ok(ArrayImpl::Jsonb(array.try_into()?))
673            }
674            _ => Err(ArrayError::from_arrow(format!(
675                "unsupported extension type: {type_name:?}"
676            ))),
677        }
678    }
679
680    fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
681        Ok(ArrayImpl::Bool(array.into()))
682    }
683
684    fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
685        Ok(ArrayImpl::Int16(array.into()))
686    }
687
688    fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
689        Ok(ArrayImpl::Int16(array.into()))
690    }
691
692    fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
693        Ok(ArrayImpl::Int16(array.into()))
694    }
695
696    fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
697        Ok(ArrayImpl::Int32(array.into()))
698    }
699
700    fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
701        Ok(ArrayImpl::Int64(array.into()))
702    }
703
704    fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
705        Ok(ArrayImpl::Int32(array.into()))
706    }
707
708    fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
709        Ok(ArrayImpl::Int64(array.into()))
710    }
711
712    fn from_int256_array(
713        &self,
714        array: &arrow_array::Decimal256Array,
715    ) -> Result<ArrayImpl, ArrayError> {
716        Ok(ArrayImpl::Int256(array.into()))
717    }
718
719    fn from_decimal128_array(
720        &self,
721        array: &arrow_array::Decimal128Array,
722    ) -> Result<ArrayImpl, ArrayError> {
723        Ok(ArrayImpl::Decimal(array.try_into()?))
724    }
725
726    fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
727        Ok(ArrayImpl::Decimal(array.try_into()?))
728    }
729
730    fn from_float16_array(
731        &self,
732        array: &arrow_array::Float16Array,
733    ) -> Result<ArrayImpl, ArrayError> {
734        Ok(ArrayImpl::Float32(array.try_into()?))
735    }
736
737    fn from_float32_array(
738        &self,
739        array: &arrow_array::Float32Array,
740    ) -> Result<ArrayImpl, ArrayError> {
741        Ok(ArrayImpl::Float32(array.into()))
742    }
743
744    fn from_float64_array(
745        &self,
746        array: &arrow_array::Float64Array,
747    ) -> Result<ArrayImpl, ArrayError> {
748        Ok(ArrayImpl::Float64(array.into()))
749    }
750
751    fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
752        Ok(ArrayImpl::Date(array.into()))
753    }
754
755    fn from_time64us_array(
756        &self,
757        array: &arrow_array::Time64MicrosecondArray,
758    ) -> Result<ArrayImpl, ArrayError> {
759        Ok(ArrayImpl::Time(array.into()))
760    }
761
762    fn from_timestampsecond_array(
763        &self,
764        array: &arrow_array::TimestampSecondArray,
765    ) -> Result<ArrayImpl, ArrayError> {
766        Ok(ArrayImpl::Timestamp(array.into()))
767    }
768    fn from_timestampsecond_some_array(
769        &self,
770        array: &arrow_array::TimestampSecondArray,
771    ) -> Result<ArrayImpl, ArrayError> {
772        Ok(ArrayImpl::Timestamptz(array.into()))
773    }
774
775    fn from_timestampms_array(
776        &self,
777        array: &arrow_array::TimestampMillisecondArray,
778    ) -> Result<ArrayImpl, ArrayError> {
779        Ok(ArrayImpl::Timestamp(array.into()))
780    }
781
782    fn from_timestampms_some_array(
783        &self,
784        array: &arrow_array::TimestampMillisecondArray,
785    ) -> Result<ArrayImpl, ArrayError> {
786        Ok(ArrayImpl::Timestamptz(array.into()))
787    }
788
789    fn from_timestampus_array(
790        &self,
791        array: &arrow_array::TimestampMicrosecondArray,
792    ) -> Result<ArrayImpl, ArrayError> {
793        Ok(ArrayImpl::Timestamp(array.into()))
794    }
795
796    fn from_timestampus_some_array(
797        &self,
798        array: &arrow_array::TimestampMicrosecondArray,
799    ) -> Result<ArrayImpl, ArrayError> {
800        Ok(ArrayImpl::Timestamptz(array.into()))
801    }
802
803    fn from_timestampns_array(
804        &self,
805        array: &arrow_array::TimestampNanosecondArray,
806    ) -> Result<ArrayImpl, ArrayError> {
807        Ok(ArrayImpl::Timestamp(array.into()))
808    }
809
810    fn from_timestampns_some_array(
811        &self,
812        array: &arrow_array::TimestampNanosecondArray,
813    ) -> Result<ArrayImpl, ArrayError> {
814        Ok(ArrayImpl::Timestamptz(array.into()))
815    }
816
817    fn from_interval_array(
818        &self,
819        array: &arrow_array::IntervalMonthDayNanoArray,
820    ) -> Result<ArrayImpl, ArrayError> {
821        Ok(ArrayImpl::Interval(array.into()))
822    }
823
824    fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
825        Ok(ArrayImpl::Utf8(array.into()))
826    }
827
828    fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
829        Ok(ArrayImpl::Bytea(array.into()))
830    }
831
832    fn from_large_utf8_array(
833        &self,
834        array: &arrow_array::LargeStringArray,
835    ) -> Result<ArrayImpl, ArrayError> {
836        Ok(ArrayImpl::Utf8(array.into()))
837    }
838
839    fn from_large_binary_array(
840        &self,
841        array: &arrow_array::LargeBinaryArray,
842    ) -> Result<ArrayImpl, ArrayError> {
843        Ok(ArrayImpl::Bytea(array.into()))
844    }
845
846    fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
847        use arrow_array::Array;
848        let arrow_schema::DataType::List(field) = array.data_type() else {
849            panic!("nested field types cannot be determined.");
850        };
851        Ok(ArrayImpl::List(ListArray {
852            value: Box::new(self.from_array(field, array.values())?),
853            bitmap: match array.nulls() {
854                Some(nulls) => nulls.iter().collect(),
855                None => Bitmap::ones(array.len()),
856            },
857            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
858        }))
859    }
860
861    fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
862        use arrow_array::Array;
863        let arrow_schema::DataType::Struct(fields) = array.data_type() else {
864            panic!("nested field types cannot be determined.");
865        };
866        Ok(ArrayImpl::Struct(StructArray::new(
867            self.from_fields(fields)?,
868            array
869                .columns()
870                .iter()
871                .zip_eq_fast(fields)
872                .map(|(array, field)| self.from_array(field, array).map(Arc::new))
873                .try_collect()?,
874            (0..array.len()).map(|i| array.is_valid(i)).collect(),
875        )))
876    }
877
878    fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
879        use arrow_array::Array;
880        let struct_array = self.from_struct_array(array.entries())?;
881        let list_array = ListArray {
882            value: Box::new(struct_array),
883            bitmap: match array.nulls() {
884                Some(nulls) => nulls.iter().collect(),
885                None => Bitmap::ones(array.len()),
886            },
887            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
888        };
889
890        Ok(ArrayImpl::Map(MapArray { inner: list_array }))
891    }
892}
893
894impl From<&Bitmap> for arrow_buffer::NullBuffer {
895    fn from(bitmap: &Bitmap) -> Self {
896        bitmap.iter().collect()
897    }
898}
899
900/// Implement bi-directional `From` between concrete array types.
901macro_rules! converts {
902    ($ArrayType:ty, $ArrowType:ty) => {
903        impl From<&$ArrayType> for $ArrowType {
904            fn from(array: &$ArrayType) -> Self {
905                array.iter().collect()
906            }
907        }
908        impl From<&$ArrowType> for $ArrayType {
909            fn from(array: &$ArrowType) -> Self {
910                array.iter().collect()
911            }
912        }
913        impl From<&[$ArrowType]> for $ArrayType {
914            fn from(arrays: &[$ArrowType]) -> Self {
915                arrays.iter().flat_map(|a| a.iter()).collect()
916            }
917        }
918    };
919    // convert values using FromIntoArrow
920    ($ArrayType:ty, $ArrowType:ty, @map) => {
921        impl From<&$ArrayType> for $ArrowType {
922            fn from(array: &$ArrayType) -> Self {
923                array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
924            }
925        }
926        impl From<&$ArrowType> for $ArrayType {
927            fn from(array: &$ArrowType) -> Self {
928                array
929                    .iter()
930                    .map(|o| {
931                        o.map(|v| {
932                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
933                        })
934                    })
935                    .collect()
936            }
937        }
938        impl From<&[$ArrowType]> for $ArrayType {
939            fn from(arrays: &[$ArrowType]) -> Self {
940                arrays
941                    .iter()
942                    .flat_map(|a| a.iter())
943                    .map(|o| {
944                        o.map(|v| {
945                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
946                        })
947                    })
948                    .collect()
949            }
950        }
951    };
952}
953
954/// Used to convert different types.
955macro_rules! converts_with_type {
956    ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
957        impl From<&$ArrayType> for $ArrowType {
958            fn from(array: &$ArrayType) -> Self {
959                let values: Vec<Option<$ToType>> =
960                    array.iter().map(|x| x.map(|v| v as $ToType)).collect();
961                <$ArrowType>::from_iter(values)
962            }
963        }
964
965        impl From<&$ArrowType> for $ArrayType {
966            fn from(array: &$ArrowType) -> Self {
967                let values: Vec<Option<$FromType>> =
968                    array.iter().map(|x| x.map(|v| v as $FromType)).collect();
969                <$ArrayType>::from_iter(values)
970            }
971        }
972
973        impl From<&[$ArrowType]> for $ArrayType {
974            fn from(arrays: &[$ArrowType]) -> Self {
975                let values: Vec<Option<$FromType>> = arrays
976                    .iter()
977                    .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
978                    .collect();
979                <$ArrayType>::from_iter(values)
980            }
981        }
982    };
983}
984
985macro_rules! converts_with_timeunit {
986    ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
987
988        impl From<&$ArrayType> for $ArrowType {
989            fn from(array: &$ArrayType) -> Self {
990                array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
991            }
992        }
993
994        impl From<&$ArrowType> for $ArrayType {
995            fn from(array: &$ArrowType) -> Self {
996                array.iter().map(|o| {
997                    o.map(|v| {
998                        let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
999                        timestamp
1000                    })
1001                }).collect()
1002            }
1003        }
1004
1005        impl From<&[$ArrowType]> for $ArrayType {
1006            fn from(arrays: &[$ArrowType]) -> Self {
1007                arrays
1008                    .iter()
1009                    .flat_map(|a| a.iter())
1010                    .map(|o| {
1011                        o.map(|v| {
1012                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1013                        })
1014                    })
1015                    .collect()
1016            }
1017        }
1018
1019    };
1020}
1021
1022converts!(BoolArray, arrow_array::BooleanArray);
1023converts!(I16Array, arrow_array::Int16Array);
1024converts!(I32Array, arrow_array::Int32Array);
1025converts!(I64Array, arrow_array::Int64Array);
1026converts!(F32Array, arrow_array::Float32Array, @map);
1027converts!(F64Array, arrow_array::Float64Array, @map);
1028converts!(BytesArray, arrow_array::BinaryArray);
1029converts!(BytesArray, arrow_array::LargeBinaryArray);
1030converts!(Utf8Array, arrow_array::StringArray);
1031converts!(Utf8Array, arrow_array::LargeStringArray);
1032converts!(DateArray, arrow_array::Date32Array, @map);
1033converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1034converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1035converts!(SerialArray, arrow_array::Int64Array, @map);
1036
1037converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1038converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1039converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1040converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1041
1042converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1043converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1044converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1045converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1046
1047converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1048converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1049converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1050converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1051
1052/// Converts RisingWave value from and into Arrow value.
1053trait FromIntoArrow {
1054    /// The corresponding element type in the Arrow array.
1055    type ArrowType;
1056    fn from_arrow(value: Self::ArrowType) -> Self;
1057    fn into_arrow(self) -> Self::ArrowType;
1058}
1059
1060/// Converts RisingWave value from and into Arrow value.
1061/// Specifically used for converting timestamp types according to timeunit.
1062trait FromIntoArrowWithUnit {
1063    type ArrowType;
1064    /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
1065    type TimestampType;
1066    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1067    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1068}
1069
1070impl FromIntoArrow for Serial {
1071    type ArrowType = i64;
1072
1073    fn from_arrow(value: Self::ArrowType) -> Self {
1074        value.into()
1075    }
1076
1077    fn into_arrow(self) -> Self::ArrowType {
1078        self.into()
1079    }
1080}
1081
1082impl FromIntoArrow for F32 {
1083    type ArrowType = f32;
1084
1085    fn from_arrow(value: Self::ArrowType) -> Self {
1086        value.into()
1087    }
1088
1089    fn into_arrow(self) -> Self::ArrowType {
1090        self.into()
1091    }
1092}
1093
1094impl FromIntoArrow for F64 {
1095    type ArrowType = f64;
1096
1097    fn from_arrow(value: Self::ArrowType) -> Self {
1098        value.into()
1099    }
1100
1101    fn into_arrow(self) -> Self::ArrowType {
1102        self.into()
1103    }
1104}
1105
1106impl FromIntoArrow for Date {
1107    type ArrowType = i32;
1108
1109    fn from_arrow(value: Self::ArrowType) -> Self {
1110        Date(arrow_array::types::Date32Type::to_naive_date(value))
1111    }
1112
1113    fn into_arrow(self) -> Self::ArrowType {
1114        arrow_array::types::Date32Type::from_naive_date(self.0)
1115    }
1116}
1117
1118impl FromIntoArrow for Time {
1119    type ArrowType = i64;
1120
1121    fn from_arrow(value: Self::ArrowType) -> Self {
1122        Time(
1123            NaiveTime::from_num_seconds_from_midnight_opt(
1124                (value / 1_000_000) as _,
1125                (value % 1_000_000 * 1000) as _,
1126            )
1127            .unwrap(),
1128        )
1129    }
1130
1131    fn into_arrow(self) -> Self::ArrowType {
1132        self.0
1133            .signed_duration_since(NaiveTime::default())
1134            .num_microseconds()
1135            .unwrap()
1136    }
1137}
1138
1139impl FromIntoArrowWithUnit for Timestamp {
1140    type ArrowType = i64;
1141    type TimestampType = TimeUnit;
1142
1143    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1144        match time_unit {
1145            TimeUnit::Second => {
1146                Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1147            }
1148            TimeUnit::Millisecond => {
1149                Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1150            }
1151            TimeUnit::Microsecond => {
1152                Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1153            }
1154            TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1155        }
1156    }
1157
1158    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1159        match time_unit {
1160            TimeUnit::Second => self.0.and_utc().timestamp(),
1161            TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1162            TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1163            TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1164        }
1165    }
1166}
1167
1168impl FromIntoArrowWithUnit for Timestamptz {
1169    type ArrowType = i64;
1170    type TimestampType = TimeUnit;
1171
1172    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1173        match time_unit {
1174            TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1175            TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1176            TimeUnit::Microsecond => Timestamptz::from_micros(value),
1177            TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1178        }
1179    }
1180
1181    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1182        match time_unit {
1183            TimeUnit::Second => self.timestamp(),
1184            TimeUnit::Millisecond => self.timestamp_millis(),
1185            TimeUnit::Microsecond => self.timestamp_micros(),
1186            TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1187        }
1188    }
1189}
1190
1191impl FromIntoArrow for Interval {
1192    type ArrowType = ArrowIntervalType;
1193
1194    fn from_arrow(value: Self::ArrowType) -> Self {
1195        Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1196    }
1197
1198    fn into_arrow(self) -> Self::ArrowType {
1199        ArrowIntervalType {
1200            months: self.months(),
1201            days: self.days(),
1202            // TODO: this may overflow and we need `try_into`
1203            nanoseconds: self.usecs() * 1000,
1204        }
1205    }
1206}
1207
1208impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1209    fn from(array: &DecimalArray) -> Self {
1210        let mut builder =
1211            arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1212        for value in array.iter() {
1213            builder.append_option(value.map(|d| d.to_string()));
1214        }
1215        builder.finish()
1216    }
1217}
1218
1219impl From<&DecimalArray> for arrow_array::StringArray {
1220    fn from(array: &DecimalArray) -> Self {
1221        let mut builder =
1222            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1223        for value in array.iter() {
1224            builder.append_option(value.map(|d| d.to_string()));
1225        }
1226        builder.finish()
1227    }
1228}
1229
1230// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
1231impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1232    type Error = ArrayError;
1233
1234    fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1235        if array.scale() < 0 {
1236            bail!("support negative scale for arrow decimal")
1237        }
1238        let from_arrow = |value| {
1239            const NAN: i128 = i128::MIN + 1;
1240            let res = match value {
1241                NAN => Decimal::NaN,
1242                i128::MAX => Decimal::PositiveInf,
1243                i128::MIN => Decimal::NegativeInf,
1244                _ => Decimal::Normalized(
1245                    rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1246                        .map_err(ArrayError::internal)?,
1247                ),
1248            };
1249            Ok(res)
1250        };
1251        array
1252            .iter()
1253            .map(|o| o.map(from_arrow).transpose())
1254            .collect::<Result<Self, Self::Error>>()
1255    }
1256}
1257
1258// Since RisingWave does not support UInt type, convert UInt64Array to Decimal.
1259impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1260    type Error = ArrayError;
1261
1262    fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1263        let from_arrow = |value| {
1264            // Convert the value to a Decimal with scale 0
1265            let res = Decimal::from(value);
1266            Ok(res)
1267        };
1268
1269        // Map over the array and convert each value
1270        array
1271            .iter()
1272            .map(|o| o.map(from_arrow).transpose())
1273            .collect::<Result<Self, Self::Error>>()
1274    }
1275}
1276
1277impl TryFrom<&arrow_array::Float16Array> for F32Array {
1278    type Error = ArrayError;
1279
1280    fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1281        let from_arrow = |value| Ok(f32::from(value));
1282
1283        array
1284            .iter()
1285            .map(|o| o.map(from_arrow).transpose())
1286            .collect::<Result<Self, Self::Error>>()
1287    }
1288}
1289
1290impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1291    type Error = ArrayError;
1292
1293    fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1294        array
1295            .iter()
1296            .map(|o| {
1297                o.map(|s| {
1298                    let s = std::str::from_utf8(s)
1299                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1300                    s.parse()
1301                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1302                })
1303                .transpose()
1304            })
1305            .try_collect()
1306    }
1307}
1308
1309impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1310    type Error = ArrayError;
1311
1312    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1313        array
1314            .iter()
1315            .map(|o| {
1316                o.map(|s| {
1317                    s.parse()
1318                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1319                })
1320                .transpose()
1321            })
1322            .try_collect()
1323    }
1324}
1325
1326impl From<&JsonbArray> for arrow_array::StringArray {
1327    fn from(array: &JsonbArray) -> Self {
1328        let mut builder =
1329            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1330        for value in array.iter() {
1331            match value {
1332                Some(jsonb) => {
1333                    write!(&mut builder, "{}", jsonb).unwrap();
1334                    builder.append_value("");
1335                }
1336                None => builder.append_null(),
1337            }
1338        }
1339        builder.finish()
1340    }
1341}
1342
1343impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1344    type Error = ArrayError;
1345
1346    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1347        array
1348            .iter()
1349            .map(|o| {
1350                o.map(|s| {
1351                    s.parse()
1352                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1353                })
1354                .transpose()
1355            })
1356            .try_collect()
1357    }
1358}
1359
1360impl From<&IntervalArray> for arrow_array::StringArray {
1361    fn from(array: &IntervalArray) -> Self {
1362        let mut builder =
1363            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1364        for value in array.iter() {
1365            match value {
1366                Some(interval) => {
1367                    write!(&mut builder, "{}", interval).unwrap();
1368                    builder.append_value("");
1369                }
1370                None => builder.append_null(),
1371            }
1372        }
1373        builder.finish()
1374    }
1375}
1376
1377impl From<&JsonbArray> for arrow_array::LargeStringArray {
1378    fn from(array: &JsonbArray) -> Self {
1379        let mut builder =
1380            arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1381        for value in array.iter() {
1382            match value {
1383                Some(jsonb) => {
1384                    write!(&mut builder, "{}", jsonb).unwrap();
1385                    builder.append_value("");
1386                }
1387                None => builder.append_null(),
1388            }
1389        }
1390        builder.finish()
1391    }
1392}
1393
1394impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1395    type Error = ArrayError;
1396
1397    fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1398        array
1399            .iter()
1400            .map(|o| {
1401                o.map(|s| {
1402                    s.parse()
1403                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1404                })
1405                .transpose()
1406            })
1407            .try_collect()
1408    }
1409}
1410
1411impl From<arrow_buffer::i256> for Int256 {
1412    fn from(value: arrow_buffer::i256) -> Self {
1413        let buffer = value.to_be_bytes();
1414        Int256::from_be_bytes(buffer)
1415    }
1416}
1417
1418impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1419    fn from(val: Int256Ref<'a>) -> Self {
1420        let buffer = val.to_be_bytes();
1421        arrow_buffer::i256::from_be_bytes(buffer)
1422    }
1423}
1424
1425impl From<&Int256Array> for arrow_array::Decimal256Array {
1426    fn from(array: &Int256Array) -> Self {
1427        array
1428            .iter()
1429            .map(|o| o.map(arrow_buffer::i256::from))
1430            .collect()
1431    }
1432}
1433
1434impl From<&arrow_array::Decimal256Array> for Int256Array {
1435    fn from(array: &arrow_array::Decimal256Array) -> Self {
1436        let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1437
1438        values
1439            .iter()
1440            .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1441            .collect()
1442    }
1443}
1444
1445/// This function checks whether the schema of a Parquet file matches the user-defined schema in RisingWave.
1446/// It handles the following special cases:
1447/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `Timestamp` type.
1448/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `Timestamptz` type.
1449/// - Since RisingWave does not have an `UInt` type:
1450///   - Arrow's `UInt8` matches with RisingWave's `Int16`.
1451///   - Arrow's `UInt16` matches with RisingWave's `Int32`.
1452///   - Arrow's `UInt32` matches with RisingWave's `Int64`.
1453///   - Arrow's `UInt64` matches with RisingWave's `Decimal`.
1454/// - Arrow's `Float16` matches with RisingWave's `Float32`.
1455///
1456/// Nested data type matching:
1457/// - Struct: Arrow's `Struct` type matches with RisingWave's `Struct` type recursively, requiring the same field names and types.
1458/// - List: Arrow's `List` type matches with RisingWave's `List` type recursively, requiring the same element type.
1459/// - Map: Arrow's `Map` type matches with RisingWave's `Map` type recursively, requiring the key and value types to match, and the inner struct must have exactly two fields named "key" and "value".
1460pub fn is_parquet_schema_match_source_schema(
1461    arrow_data_type: &arrow_schema::DataType,
1462    rw_data_type: &crate::types::DataType,
1463) -> bool {
1464    use arrow_schema::DataType as ArrowType;
1465
1466    use crate::types::{DataType as RwType, MapType, StructType};
1467
1468    match (arrow_data_type, rw_data_type) {
1469        // Primitive type matching and special cases
1470        (ArrowType::Boolean, RwType::Boolean)
1471        | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1472        | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1473        | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1474        | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1475        | (ArrowType::Decimal256(_, _), RwType::Int256)
1476        | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1477        | (ArrowType::Float64, RwType::Float64)
1478        | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1479        | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1480        | (ArrowType::Date32, RwType::Date)
1481        | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1482        | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1483        | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1484        | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1485
1486        // Struct type recursive matching
1487        // Arrow's Struct matches RisingWave's Struct if all field names and types match recursively
1488        (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1489            if arrow_fields.len() != rw_struct.len() {
1490                return false;
1491            }
1492            for (arrow_field, (rw_name, rw_ty)) in arrow_fields.iter().zip_eq_fast(rw_struct.iter())
1493            {
1494                if arrow_field.name() != rw_name {
1495                    return false;
1496                }
1497                if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1498                    return false;
1499                }
1500            }
1501            true
1502        }
1503        // List type recursive matching
1504        // Arrow's List matches RisingWave's List if the element type matches recursively
1505        (ArrowType::List(arrow_field), RwType::List(rw_elem_ty)) => {
1506            is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_elem_ty)
1507        }
1508        // Map type recursive matching
1509        // Arrow's Map matches RisingWave's Map if the key and value types match recursively,
1510        // and the inner struct has exactly two fields named "key" and "value"
1511        (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1512            if let ArrowType::Struct(fields) = arrow_field.data_type() {
1513                if fields.len() != 2 {
1514                    return false;
1515                }
1516                let key_field = &fields[0];
1517                let value_field = &fields[1];
1518                if key_field.name() != "key" || value_field.name() != "value" {
1519                    return false;
1520                }
1521                let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1522                is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1523                    && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1524            } else {
1525                false
1526            }
1527        }
1528        // Fallback: types do not match
1529        _ => false,
1530    }
1531}
1532#[cfg(test)]
1533mod tests {
1534
1535    use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1536
1537    use super::*;
1538    use crate::types::{DataType as RwType, MapType, StructType};
1539
1540    #[test]
1541    fn test_struct_schema_match() {
1542        // Arrow: struct<f1: Double, f2: Utf8>
1543
1544        let arrow_struct = ArrowType::Struct(
1545            vec![
1546                ArrowField::new("f1", ArrowType::Float64, true),
1547                ArrowField::new("f2", ArrowType::Utf8, true),
1548            ]
1549            .into(),
1550        );
1551        // RW: struct<f1 Double, f2 Varchar>
1552        let rw_struct = RwType::Struct(StructType::new(vec![
1553            ("f1".to_owned(), RwType::Float64),
1554            ("f2".to_owned(), RwType::Varchar),
1555        ]));
1556        assert!(is_parquet_schema_match_source_schema(
1557            &arrow_struct,
1558            &rw_struct
1559        ));
1560
1561        // Field names do not match
1562        let arrow_struct2 = ArrowType::Struct(
1563            vec![
1564                ArrowField::new("f1", ArrowType::Float64, true),
1565                ArrowField::new("f3", ArrowType::Utf8, true),
1566            ]
1567            .into(),
1568        );
1569        assert!(!is_parquet_schema_match_source_schema(
1570            &arrow_struct2,
1571            &rw_struct
1572        ));
1573    }
1574
1575    #[test]
1576    fn test_list_schema_match() {
1577        // Arrow: list<double>
1578        let arrow_list =
1579            ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1580        // RW: list<double>
1581        let rw_list = RwType::List(Box::new(RwType::Float64));
1582        assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1583
1584        let rw_list2 = RwType::List(Box::new(RwType::Int32));
1585        assert!(!is_parquet_schema_match_source_schema(
1586            &arrow_list,
1587            &rw_list2
1588        ));
1589    }
1590
1591    #[test]
1592    fn test_map_schema_match() {
1593        // Arrow: map<utf8, int32>
1594        let arrow_map = ArrowType::Map(
1595            Arc::new(ArrowField::new(
1596                "entries",
1597                ArrowType::Struct(
1598                    vec![
1599                        ArrowField::new("key", ArrowType::Utf8, false),
1600                        ArrowField::new("value", ArrowType::Int32, true),
1601                    ]
1602                    .into(),
1603                ),
1604                false,
1605            )),
1606            false,
1607        );
1608        // RW: map<varchar, int32>
1609        let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1610        assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1611
1612        // Key type does not match
1613        let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1614        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1615
1616        // Value type does not match
1617        let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1618        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1619
1620        // Arrow inner struct field name does not match
1621        let arrow_map2 = ArrowType::Map(
1622            Arc::new(ArrowField::new(
1623                "entries",
1624                ArrowType::Struct(
1625                    vec![
1626                        ArrowField::new("k", ArrowType::Utf8, false),
1627                        ArrowField::new("value", ArrowType::Int32, true),
1628                    ]
1629                    .into(),
1630                ),
1631                false,
1632            )),
1633            false,
1634        );
1635        assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1636    }
1637
1638    #[test]
1639    fn bool() {
1640        let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1641        let arrow = arrow_array::BooleanArray::from(&array);
1642        assert_eq!(BoolArray::from(&arrow), array);
1643    }
1644
1645    #[test]
1646    fn i16() {
1647        let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1648        let arrow = arrow_array::Int16Array::from(&array);
1649        assert_eq!(I16Array::from(&arrow), array);
1650    }
1651
1652    #[test]
1653    fn i32() {
1654        let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1655        let arrow = arrow_array::Int32Array::from(&array);
1656        assert_eq!(I32Array::from(&arrow), array);
1657    }
1658
1659    #[test]
1660    fn i64() {
1661        let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1662        let arrow = arrow_array::Int64Array::from(&array);
1663        assert_eq!(I64Array::from(&arrow), array);
1664    }
1665
1666    #[test]
1667    fn f32() {
1668        let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1669        let arrow = arrow_array::Float32Array::from(&array);
1670        assert_eq!(F32Array::from(&arrow), array);
1671    }
1672
1673    #[test]
1674    fn f64() {
1675        let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1676        let arrow = arrow_array::Float64Array::from(&array);
1677        assert_eq!(F64Array::from(&arrow), array);
1678    }
1679
1680    #[test]
1681    fn int8() {
1682        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1683        let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1684        let converted: PrimitiveArray<i16> = (&arr).into();
1685        assert_eq!(converted, array);
1686    }
1687
1688    #[test]
1689    fn uint8() {
1690        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1691        let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1692        let converted: PrimitiveArray<i16> = (&arr).into();
1693        assert_eq!(converted, array);
1694    }
1695
1696    #[test]
1697    fn uint16() {
1698        let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1699        let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1700        let converted: PrimitiveArray<i32> = (&arr).into();
1701        assert_eq!(converted, array);
1702    }
1703
1704    #[test]
1705    fn uint32() {
1706        let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1707        let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1708        let converted: PrimitiveArray<i64> = (&arr).into();
1709        assert_eq!(converted, array);
1710    }
1711
1712    #[test]
1713    fn uint64() {
1714        let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1715            None,
1716            Some(Decimal::Normalized("7".parse().unwrap())),
1717            Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1718        ]);
1719        let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1720        let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1721        assert_eq!(converted, array);
1722    }
1723
1724    #[test]
1725    fn date() {
1726        let array = DateArray::from_iter([
1727            None,
1728            Date::with_days_since_ce(12345).ok(),
1729            Date::with_days_since_ce(-12345).ok(),
1730        ]);
1731        let arrow = arrow_array::Date32Array::from(&array);
1732        assert_eq!(DateArray::from(&arrow), array);
1733    }
1734
1735    #[test]
1736    fn time() {
1737        let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1738        let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1739        assert_eq!(TimeArray::from(&arrow), array);
1740    }
1741
1742    #[test]
1743    fn timestamp() {
1744        let array =
1745            TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1746        let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1747        assert_eq!(TimestampArray::from(&arrow), array);
1748    }
1749
1750    #[test]
1751    fn interval() {
1752        let array = IntervalArray::from_iter([
1753            None,
1754            Some(Interval::from_month_day_usec(
1755                1_000_000,
1756                1_000,
1757                1_000_000_000,
1758            )),
1759            Some(Interval::from_month_day_usec(
1760                -1_000_000,
1761                -1_000,
1762                -1_000_000_000,
1763            )),
1764        ]);
1765        let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1766        assert_eq!(IntervalArray::from(&arrow), array);
1767    }
1768
1769    #[test]
1770    fn string() {
1771        let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1772        let arrow = arrow_array::StringArray::from(&array);
1773        assert_eq!(Utf8Array::from(&arrow), array);
1774    }
1775
1776    #[test]
1777    fn binary() {
1778        let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1779        let arrow = arrow_array::BinaryArray::from(&array);
1780        assert_eq!(BytesArray::from(&arrow), array);
1781    }
1782
1783    #[test]
1784    fn decimal() {
1785        let array = DecimalArray::from_iter([
1786            None,
1787            Some(Decimal::NaN),
1788            Some(Decimal::PositiveInf),
1789            Some(Decimal::NegativeInf),
1790            Some(Decimal::Normalized("123.4".parse().unwrap())),
1791            Some(Decimal::Normalized("123.456".parse().unwrap())),
1792        ]);
1793        let arrow = arrow_array::LargeBinaryArray::from(&array);
1794        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1795
1796        let arrow = arrow_array::StringArray::from(&array);
1797        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1798    }
1799
1800    #[test]
1801    fn jsonb() {
1802        let array = JsonbArray::from_iter([
1803            None,
1804            Some("null".parse().unwrap()),
1805            Some("false".parse().unwrap()),
1806            Some("1".parse().unwrap()),
1807            Some("[1, 2, 3]".parse().unwrap()),
1808            Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1809        ]);
1810        let arrow = arrow_array::LargeStringArray::from(&array);
1811        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1812
1813        let arrow = arrow_array::StringArray::from(&array);
1814        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1815    }
1816
1817    #[test]
1818    fn int256() {
1819        let values = [
1820            None,
1821            Some(Int256::from(1)),
1822            Some(Int256::from(i64::MAX)),
1823            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1824            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1825            Some(
1826                Int256::from(i64::MAX)
1827                    * Int256::from(i64::MAX)
1828                    * Int256::from(i64::MAX)
1829                    * Int256::from(i64::MAX),
1830            ),
1831            Some(Int256::min_value()),
1832            Some(Int256::max_value()),
1833        ];
1834
1835        let array =
1836            Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1837        let arrow = arrow_array::Decimal256Array::from(&array);
1838        assert_eq!(Int256Array::from(&arrow), array);
1839    }
1840}