risingwave_common/array/arrow/
arrow_impl.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Converts between arrays and Apache Arrow arrays.
16//!
17//! This file acts as a template file for conversion code between
18//! arrays and different version of Apache Arrow.
19//!
20//! The conversion logic will be implemented for the arrow version specified in the outer mod by
21//! `super::arrow_xxx`, such as `super::arrow_array`.
22//!
23//! When we want to implement the conversion logic for an arrow version, we first
24//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
25//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
26//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
27//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
28//! version.
29//!
30//! Example can be seen in `arrow_default.rs`, which is also as followed:
31//! ```ignore
32//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
33//!
34//! #[allow(clippy::duplicate_mod)]
35//! #[path = "./arrow_impl.rs"]
36//! mod arrow_impl;
37//! ```
38
39// Is this a bug? Why do we have these lints?
40#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53// This is important because we want to use the arrow version specified by the outer mod.
54use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55// Other import should always use the absolute path.
56use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60/// Defines how to convert RisingWave arrays to Arrow arrays.
61///
62/// This trait allows for customized conversion logic for different external systems using Arrow.
63/// The default implementation is based on the `From` implemented in this mod.
64pub trait ToArrow {
65    /// Converts RisingWave `DataChunk` to Arrow `RecordBatch` with specified schema.
66    ///
67    /// This function will try to convert the array if the type is not same with the schema.
68    fn to_record_batch(
69        &self,
70        schema: arrow_schema::SchemaRef,
71        chunk: &DataChunk,
72    ) -> Result<arrow_array::RecordBatch, ArrayError> {
73        // compact the chunk if it's not compacted
74        if !chunk.is_vis_compacted() {
75            let c = chunk.clone();
76            return self.to_record_batch(schema, &c.compact_vis());
77        }
78
79        // convert each column to arrow array
80        let columns: Vec<_> = chunk
81            .columns()
82            .iter()
83            .zip_eq_fast(schema.fields().iter())
84            .map(|(column, field)| self.to_array(field.data_type(), column))
85            .try_collect()?;
86
87        // create record batch
88        let opts =
89            arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90        arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91            .map_err(ArrayError::to_arrow)
92    }
93
94    /// Converts RisingWave array to Arrow array.
95    fn to_array(
96        &self,
97        data_type: &arrow_schema::DataType,
98        array: &ArrayImpl,
99    ) -> Result<arrow_array::ArrayRef, ArrayError> {
100        let arrow_array = match array {
101            ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102            ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103            ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104            ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105            ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106            ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107            ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108            ArrayImpl::Date(array) => self.date_to_arrow(array),
109            ArrayImpl::Time(array) => self.time_to_arrow(array),
110            ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111            ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112            ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113            ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114            ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115            ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116            ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117            ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118            ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119            ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121            ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122        }?;
123        if arrow_array.data_type() != data_type {
124            arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125        } else {
126            Ok(arrow_array)
127        }
128    }
129
130    #[inline]
131    fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132        Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133    }
134
135    #[inline]
136    fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137        Ok(Arc::new(arrow_array::Int16Array::from(array)))
138    }
139
140    #[inline]
141    fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142        Ok(Arc::new(arrow_array::Int32Array::from(array)))
143    }
144
145    #[inline]
146    fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147        Ok(Arc::new(arrow_array::Int64Array::from(array)))
148    }
149
150    #[inline]
151    fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152        Ok(Arc::new(arrow_array::Float32Array::from(array)))
153    }
154
155    #[inline]
156    fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157        Ok(Arc::new(arrow_array::Float64Array::from(array)))
158    }
159
160    #[inline]
161    fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162        Ok(Arc::new(arrow_array::StringArray::from(array)))
163    }
164
165    #[inline]
166    fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167        Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168    }
169
170    #[inline]
171    fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172        Ok(Arc::new(arrow_array::Date32Array::from(array)))
173    }
174
175    #[inline]
176    fn timestamp_to_arrow(
177        &self,
178        array: &TimestampArray,
179    ) -> Result<arrow_array::ArrayRef, ArrayError> {
180        Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181            array,
182        )))
183    }
184
185    #[inline]
186    fn timestamptz_to_arrow(
187        &self,
188        array: &TimestamptzArray,
189    ) -> Result<arrow_array::ArrayRef, ArrayError> {
190        Ok(Arc::new(
191            arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192        ))
193    }
194
195    #[inline]
196    fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197        Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198    }
199
200    #[inline]
201    fn interval_to_arrow(
202        &self,
203        array: &IntervalArray,
204    ) -> Result<arrow_array::ArrayRef, ArrayError> {
205        Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206            array,
207        )))
208    }
209
210    #[inline]
211    fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212        Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213    }
214
215    // Decimal values are stored as ASCII text representation in a string array.
216    #[inline]
217    fn decimal_to_arrow(
218        &self,
219        _data_type: &arrow_schema::DataType,
220        array: &DecimalArray,
221    ) -> Result<arrow_array::ArrayRef, ArrayError> {
222        Ok(Arc::new(arrow_array::StringArray::from(array)))
223    }
224
225    // JSON values are stored as text representation in a string array.
226    #[inline]
227    fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228        Ok(Arc::new(arrow_array::StringArray::from(array)))
229    }
230
231    #[inline]
232    fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233        Ok(Arc::new(arrow_array::Int64Array::from(array)))
234    }
235
236    #[inline]
237    fn list_to_arrow(
238        &self,
239        data_type: &arrow_schema::DataType,
240        array: &ListArray,
241    ) -> Result<arrow_array::ArrayRef, ArrayError> {
242        let arrow_schema::DataType::List(field) = data_type else {
243            return Err(ArrayError::to_arrow("Invalid list type"));
244        };
245        let values = self.to_array(field.data_type(), array.values())?;
246        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248        Ok(Arc::new(arrow_array::ListArray::new(
249            field.clone(),
250            offsets,
251            values,
252            nulls,
253        )))
254    }
255
256    #[inline]
257    fn vector_to_arrow(
258        &self,
259        data_type: &arrow_schema::DataType,
260        array: &VectorArray,
261    ) -> Result<arrow_array::ArrayRef, ArrayError> {
262        let arrow_schema::DataType::List(field) = data_type else {
263            return Err(ArrayError::to_arrow("Invalid list type"));
264        };
265        if field.data_type() != &arrow_schema::DataType::Float32 {
266            return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267        }
268        let values = Arc::new(arrow_array::Float32Array::from(
269            array.as_raw_slice().to_vec(),
270        ));
271        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273        Ok(Arc::new(arrow_array::ListArray::new(
274            field.clone(),
275            offsets,
276            values,
277            nulls,
278        )))
279    }
280
281    #[inline]
282    fn struct_to_arrow(
283        &self,
284        data_type: &arrow_schema::DataType,
285        array: &StructArray,
286    ) -> Result<arrow_array::ArrayRef, ArrayError> {
287        let arrow_schema::DataType::Struct(fields) = data_type else {
288            return Err(ArrayError::to_arrow("Invalid struct type"));
289        };
290        Ok(Arc::new(arrow_array::StructArray::new(
291            fields.clone(),
292            array
293                .fields()
294                .zip_eq_fast(fields)
295                .map(|(arr, field)| self.to_array(field.data_type(), arr))
296                .try_collect::<_, _, ArrayError>()?,
297            Some(array.null_bitmap().into()),
298        )))
299    }
300
301    #[inline]
302    fn map_to_arrow(
303        &self,
304        data_type: &arrow_schema::DataType,
305        array: &MapArray,
306    ) -> Result<arrow_array::ArrayRef, ArrayError> {
307        let arrow_schema::DataType::Map(field, ordered) = data_type else {
308            return Err(ArrayError::to_arrow("Invalid map type"));
309        };
310        if *ordered {
311            return Err(ArrayError::to_arrow("Sorted map is not supported"));
312        }
313        let values = self
314            .struct_to_arrow(field.data_type(), array.as_struct())?
315            .as_struct()
316            .clone();
317        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
318        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
319        Ok(Arc::new(arrow_array::MapArray::new(
320            field.clone(),
321            offsets,
322            values,
323            nulls,
324            *ordered,
325        )))
326    }
327
328    /// Convert RisingWave data type to Arrow data type.
329    ///
330    /// This function returns a `Field` instead of `DataType` because some may be converted to
331    /// extension types which require additional metadata in the field.
332    fn to_arrow_field(
333        &self,
334        name: &str,
335        value: &DataType,
336    ) -> Result<arrow_schema::Field, ArrayError> {
337        let data_type = match value {
338            // using the inline function
339            DataType::Boolean => self.bool_type_to_arrow(),
340            DataType::Int16 => self.int16_type_to_arrow(),
341            DataType::Int32 => self.int32_type_to_arrow(),
342            DataType::Int64 => self.int64_type_to_arrow(),
343            DataType::Int256 => self.int256_type_to_arrow(),
344            DataType::Float32 => self.float32_type_to_arrow(),
345            DataType::Float64 => self.float64_type_to_arrow(),
346            DataType::Date => self.date_type_to_arrow(),
347            DataType::Time => self.time_type_to_arrow(),
348            DataType::Timestamp => self.timestamp_type_to_arrow(),
349            DataType::Timestamptz => self.timestamptz_type_to_arrow(),
350            DataType::Interval => self.interval_type_to_arrow(),
351            DataType::Varchar => self.varchar_type_to_arrow(),
352            DataType::Bytea => self.bytea_type_to_arrow(),
353            DataType::Serial => self.serial_type_to_arrow(),
354            DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
355            DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
356            DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
357            DataType::List(list) => self.list_type_to_arrow(list)?,
358            DataType::Map(map) => self.map_type_to_arrow(map)?,
359            DataType::Vector(_) => self.vector_type_to_arrow()?,
360        };
361        Ok(arrow_schema::Field::new(name, data_type, true))
362    }
363
364    #[inline]
365    fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
366        arrow_schema::DataType::Boolean
367    }
368
369    #[inline]
370    fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
371        arrow_schema::DataType::Int16
372    }
373
374    #[inline]
375    fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
376        arrow_schema::DataType::Int32
377    }
378
379    #[inline]
380    fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
381        arrow_schema::DataType::Int64
382    }
383
384    #[inline]
385    fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
386        arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
387    }
388
389    #[inline]
390    fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
391        arrow_schema::DataType::Float32
392    }
393
394    #[inline]
395    fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
396        arrow_schema::DataType::Float64
397    }
398
399    #[inline]
400    fn date_type_to_arrow(&self) -> arrow_schema::DataType {
401        arrow_schema::DataType::Date32
402    }
403
404    #[inline]
405    fn time_type_to_arrow(&self) -> arrow_schema::DataType {
406        arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
407    }
408
409    #[inline]
410    fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
411        arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
412    }
413
414    #[inline]
415    fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
416        arrow_schema::DataType::Timestamp(
417            arrow_schema::TimeUnit::Microsecond,
418            Some("+00:00".into()),
419        )
420    }
421
422    #[inline]
423    fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
424        arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
425    }
426
427    #[inline]
428    fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
429        arrow_schema::DataType::Utf8
430    }
431
432    #[inline]
433    fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
434        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
435            .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
436    }
437
438    #[inline]
439    fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
440        arrow_schema::DataType::Binary
441    }
442
443    #[inline]
444    fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
445        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
446            .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
447    }
448
449    #[inline]
450    fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
451        arrow_schema::DataType::Int64
452    }
453
454    #[inline]
455    fn list_type_to_arrow(
456        &self,
457        list_type: &ListType,
458    ) -> Result<arrow_schema::DataType, ArrayError> {
459        Ok(arrow_schema::DataType::List(Arc::new(
460            self.to_arrow_field("item", list_type.elem())?,
461        )))
462    }
463
464    #[inline]
465    fn struct_type_to_arrow(
466        &self,
467        fields: &StructType,
468    ) -> Result<arrow_schema::DataType, ArrayError> {
469        Ok(arrow_schema::DataType::Struct(
470            fields
471                .iter()
472                .map(|(name, ty)| self.to_arrow_field(name, ty))
473                .try_collect::<_, _, ArrayError>()?,
474        ))
475    }
476
477    #[inline]
478    fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
479        let sorted = false;
480        // "key" is always non-null
481        let key = self
482            .to_arrow_field("key", map_type.key())?
483            .with_nullable(false);
484        let value = self.to_arrow_field("value", map_type.value())?;
485        Ok(arrow_schema::DataType::Map(
486            Arc::new(arrow_schema::Field::new(
487                "entries",
488                arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
489                // "entries" is always non-null
490                false,
491            )),
492            sorted,
493        ))
494    }
495
496    #[inline]
497    fn vector_type_to_arrow(&self) -> Result<arrow_schema::DataType, ArrayError> {
498        Ok(arrow_schema::DataType::List(Arc::new(
499            self.to_arrow_field("item", &VECTOR_ITEM_TYPE)?,
500        )))
501    }
502}
503
504/// Defines how to convert Arrow arrays to RisingWave arrays.
505#[allow(clippy::wrong_self_convention)]
506pub trait FromArrow {
507    /// Converts Arrow `RecordBatch` to RisingWave `DataChunk`.
508    fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
509        let mut columns = Vec::with_capacity(batch.num_columns());
510        for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
511            let column = Arc::new(self.from_array(field, array)?);
512            columns.push(column);
513        }
514        Ok(DataChunk::new(columns, batch.num_rows()))
515    }
516
517    /// Converts Arrow `Fields` to RisingWave `StructType`.
518    fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
519        Ok(StructType::new(
520            fields
521                .iter()
522                .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
523                .try_collect::<_, Vec<_>, ArrayError>()?,
524        ))
525    }
526
527    /// Converts Arrow `Field` to RisingWave `DataType`.
528    fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
529        use arrow_schema::DataType::*;
530        use arrow_schema::IntervalUnit::*;
531        use arrow_schema::TimeUnit::*;
532
533        // extension type
534        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
535            return self.from_extension_type(type_name, field.data_type());
536        }
537
538        Ok(match field.data_type() {
539            Boolean => DataType::Boolean,
540            Int16 => DataType::Int16,
541            Int32 => DataType::Int32,
542            Int64 => DataType::Int64,
543            Int8 => DataType::Int16,
544            UInt8 => DataType::Int16,
545            UInt16 => DataType::Int32,
546            UInt32 => DataType::Int64,
547            UInt64 => DataType::Decimal,
548            Float16 => DataType::Float32,
549            Float32 => DataType::Float32,
550            Float64 => DataType::Float64,
551            Decimal128(_, _) => DataType::Decimal,
552            Decimal256(_, _) => DataType::Int256,
553            Date32 => DataType::Date,
554            Time64(Microsecond) => DataType::Time,
555            Timestamp(Microsecond, None) => DataType::Timestamp,
556            Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
557            Timestamp(Second, None) => DataType::Timestamp,
558            Timestamp(Second, Some(_)) => DataType::Timestamptz,
559            Timestamp(Millisecond, None) => DataType::Timestamp,
560            Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
561            Timestamp(Nanosecond, None) => DataType::Timestamp,
562            Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
563            Interval(MonthDayNano) => DataType::Interval,
564            Utf8 => DataType::Varchar,
565            Utf8View => DataType::Varchar,
566            Binary => DataType::Bytea,
567            LargeUtf8 => self.from_large_utf8()?,
568            LargeBinary => self.from_large_binary()?,
569            List(field) => DataType::list(self.from_field(field)?),
570            Struct(fields) => DataType::Struct(self.from_fields(fields)?),
571            Map(field, _is_sorted) => {
572                let entries = self.from_field(field)?;
573                DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
574                    ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
575                })?)
576            }
577            t => {
578                return Err(ArrayError::from_arrow(format!(
579                    "unsupported arrow data type: {t:?}"
580                )));
581            }
582        })
583    }
584
585    /// Converts Arrow `LargeUtf8` type to RisingWave data type.
586    fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
587        Ok(DataType::Varchar)
588    }
589
590    /// Converts Arrow `LargeBinary` type to RisingWave data type.
591    fn from_large_binary(&self) -> Result<DataType, ArrayError> {
592        Ok(DataType::Bytea)
593    }
594
595    /// Converts Arrow extension type to RisingWave `DataType`.
596    fn from_extension_type(
597        &self,
598        type_name: &str,
599        physical_type: &arrow_schema::DataType,
600    ) -> Result<DataType, ArrayError> {
601        match (type_name, physical_type) {
602            ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
603            ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
604            _ => Err(ArrayError::from_arrow(format!(
605                "unsupported extension type: {type_name:?}"
606            ))),
607        }
608    }
609
610    /// Converts Arrow `Array` to RisingWave `ArrayImpl`.
611    fn from_array(
612        &self,
613        field: &arrow_schema::Field,
614        array: &arrow_array::ArrayRef,
615    ) -> Result<ArrayImpl, ArrayError> {
616        use arrow_schema::DataType::*;
617        use arrow_schema::IntervalUnit::*;
618        use arrow_schema::TimeUnit::*;
619
620        // extension type
621        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
622            return self.from_extension_array(type_name, array);
623        }
624
625        // Struct projection for file source (Parquet): allow Arrow struct to be a superset of the
626        // expected struct fields. We align fields by name and ignore extra fields.
627        //
628        // Only use projection when Arrow struct differs from expected (superset, reordered, or
629        // different field names). If they match exactly, fall through to the normal path to
630        // avoid unnecessary overhead and potential issues with UDF/other paths.
631        if let (
632            arrow_schema::DataType::Struct(expected_fields),
633            arrow_schema::DataType::Struct(actual_fields),
634        ) = (field.data_type(), array.data_type())
635        {
636            let dominated = Self::struct_fields_dominated(expected_fields, actual_fields);
637            if dominated {
638                let struct_array: &arrow_array::StructArray =
639                    array.as_any().downcast_ref().unwrap();
640                return self.from_struct_array_projected(expected_fields, struct_array);
641            }
642            // else: fields match exactly, fall through to normal Struct(_) path below
643        }
644        match array.data_type() {
645            Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
646            Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
647            Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
648            Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
649            Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
650            UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
651            UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
652            UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
653
654            UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
655            Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
656            Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
657            Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
658            Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
659            Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
660            Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
661            Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
662            Timestamp(Second, None) => {
663                self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
664            }
665            Timestamp(Second, Some(_)) => {
666                self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
667            }
668            Timestamp(Millisecond, None) => {
669                self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
670            }
671            Timestamp(Millisecond, Some(_)) => {
672                self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
673            }
674            Timestamp(Microsecond, None) => {
675                self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
676            }
677            Timestamp(Microsecond, Some(_)) => {
678                self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
679            }
680            Timestamp(Nanosecond, None) => {
681                self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
682            }
683            Timestamp(Nanosecond, Some(_)) => {
684                self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
685            }
686            Interval(MonthDayNano) => {
687                self.from_interval_array(array.as_any().downcast_ref().unwrap())
688            }
689            Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
690            Utf8View => self.from_utf8_view_array(array.as_any().downcast_ref().unwrap()),
691            Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
692            LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
693            LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
694            List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
695            Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
696            Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
697            t => Err(ArrayError::from_arrow(format!(
698                "unsupported arrow data type: {t:?}",
699            ))),
700        }
701    }
702
703    /// Converts Arrow extension array to RisingWave `ArrayImpl`.
704    fn from_extension_array(
705        &self,
706        type_name: &str,
707        array: &arrow_array::ArrayRef,
708    ) -> Result<ArrayImpl, ArrayError> {
709        match type_name {
710            "arrowudf.decimal" => {
711                let array: &arrow_array::StringArray =
712                    array.as_any().downcast_ref().ok_or_else(|| {
713                        ArrayError::from_arrow(
714                            "expected string array for `arrowudf.decimal`".to_owned(),
715                        )
716                    })?;
717                Ok(ArrayImpl::Decimal(array.try_into()?))
718            }
719            "arrowudf.json" => {
720                let array: &arrow_array::StringArray =
721                    array.as_any().downcast_ref().ok_or_else(|| {
722                        ArrayError::from_arrow(
723                            "expected string array for `arrowudf.json`".to_owned(),
724                        )
725                    })?;
726                Ok(ArrayImpl::Jsonb(array.try_into()?))
727            }
728            _ => Err(ArrayError::from_arrow(format!(
729                "unsupported extension type: {type_name:?}"
730            ))),
731        }
732    }
733
734    fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
735        Ok(ArrayImpl::Bool(array.into()))
736    }
737
738    fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
739        Ok(ArrayImpl::Int16(array.into()))
740    }
741
742    fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
743        Ok(ArrayImpl::Int16(array.into()))
744    }
745
746    fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
747        Ok(ArrayImpl::Int16(array.into()))
748    }
749
750    fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
751        Ok(ArrayImpl::Int32(array.into()))
752    }
753
754    fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
755        Ok(ArrayImpl::Int64(array.into()))
756    }
757
758    fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
759        Ok(ArrayImpl::Int32(array.into()))
760    }
761
762    fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
763        Ok(ArrayImpl::Int64(array.into()))
764    }
765
766    fn from_int256_array(
767        &self,
768        array: &arrow_array::Decimal256Array,
769    ) -> Result<ArrayImpl, ArrayError> {
770        Ok(ArrayImpl::Int256(array.into()))
771    }
772
773    fn from_decimal128_array(
774        &self,
775        array: &arrow_array::Decimal128Array,
776    ) -> Result<ArrayImpl, ArrayError> {
777        Ok(ArrayImpl::Decimal(array.try_into()?))
778    }
779
780    fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
781        Ok(ArrayImpl::Decimal(array.try_into()?))
782    }
783
784    fn from_float16_array(
785        &self,
786        array: &arrow_array::Float16Array,
787    ) -> Result<ArrayImpl, ArrayError> {
788        Ok(ArrayImpl::Float32(array.try_into()?))
789    }
790
791    fn from_float32_array(
792        &self,
793        array: &arrow_array::Float32Array,
794    ) -> Result<ArrayImpl, ArrayError> {
795        Ok(ArrayImpl::Float32(array.into()))
796    }
797
798    fn from_float64_array(
799        &self,
800        array: &arrow_array::Float64Array,
801    ) -> Result<ArrayImpl, ArrayError> {
802        Ok(ArrayImpl::Float64(array.into()))
803    }
804
805    fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
806        Ok(ArrayImpl::Date(array.into()))
807    }
808
809    fn from_time64us_array(
810        &self,
811        array: &arrow_array::Time64MicrosecondArray,
812    ) -> Result<ArrayImpl, ArrayError> {
813        Ok(ArrayImpl::Time(array.into()))
814    }
815
816    fn from_timestampsecond_array(
817        &self,
818        array: &arrow_array::TimestampSecondArray,
819    ) -> Result<ArrayImpl, ArrayError> {
820        Ok(ArrayImpl::Timestamp(array.into()))
821    }
822    fn from_timestampsecond_some_array(
823        &self,
824        array: &arrow_array::TimestampSecondArray,
825    ) -> Result<ArrayImpl, ArrayError> {
826        Ok(ArrayImpl::Timestamptz(array.into()))
827    }
828
829    fn from_timestampms_array(
830        &self,
831        array: &arrow_array::TimestampMillisecondArray,
832    ) -> Result<ArrayImpl, ArrayError> {
833        Ok(ArrayImpl::Timestamp(array.into()))
834    }
835
836    fn from_timestampms_some_array(
837        &self,
838        array: &arrow_array::TimestampMillisecondArray,
839    ) -> Result<ArrayImpl, ArrayError> {
840        Ok(ArrayImpl::Timestamptz(array.into()))
841    }
842
843    fn from_timestampus_array(
844        &self,
845        array: &arrow_array::TimestampMicrosecondArray,
846    ) -> Result<ArrayImpl, ArrayError> {
847        Ok(ArrayImpl::Timestamp(array.into()))
848    }
849
850    fn from_timestampus_some_array(
851        &self,
852        array: &arrow_array::TimestampMicrosecondArray,
853    ) -> Result<ArrayImpl, ArrayError> {
854        Ok(ArrayImpl::Timestamptz(array.into()))
855    }
856
857    fn from_timestampns_array(
858        &self,
859        array: &arrow_array::TimestampNanosecondArray,
860    ) -> Result<ArrayImpl, ArrayError> {
861        Ok(ArrayImpl::Timestamp(array.into()))
862    }
863
864    fn from_timestampns_some_array(
865        &self,
866        array: &arrow_array::TimestampNanosecondArray,
867    ) -> Result<ArrayImpl, ArrayError> {
868        Ok(ArrayImpl::Timestamptz(array.into()))
869    }
870
871    fn from_interval_array(
872        &self,
873        array: &arrow_array::IntervalMonthDayNanoArray,
874    ) -> Result<ArrayImpl, ArrayError> {
875        Ok(ArrayImpl::Interval(array.into()))
876    }
877
878    fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
879        Ok(ArrayImpl::Utf8(array.into()))
880    }
881
882    fn from_utf8_view_array(
883        &self,
884        array: &arrow_array::StringViewArray,
885    ) -> Result<ArrayImpl, ArrayError> {
886        Ok(ArrayImpl::Utf8(array.into()))
887    }
888
889    fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
890        Ok(ArrayImpl::Bytea(array.into()))
891    }
892
893    fn from_large_utf8_array(
894        &self,
895        array: &arrow_array::LargeStringArray,
896    ) -> Result<ArrayImpl, ArrayError> {
897        Ok(ArrayImpl::Utf8(array.into()))
898    }
899
900    fn from_large_binary_array(
901        &self,
902        array: &arrow_array::LargeBinaryArray,
903    ) -> Result<ArrayImpl, ArrayError> {
904        Ok(ArrayImpl::Bytea(array.into()))
905    }
906
907    fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
908        use arrow_array::Array;
909        let arrow_schema::DataType::List(field) = array.data_type() else {
910            panic!("nested field types cannot be determined.");
911        };
912        Ok(ArrayImpl::List(ListArray {
913            value: Box::new(self.from_array(field, array.values())?),
914            bitmap: match array.nulls() {
915                Some(nulls) => nulls.iter().collect(),
916                None => Bitmap::ones(array.len()),
917            },
918            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
919        }))
920    }
921
922    fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
923        use arrow_array::Array;
924        let arrow_schema::DataType::Struct(fields) = array.data_type() else {
925            panic!("nested field types cannot be determined.");
926        };
927        Ok(ArrayImpl::Struct(StructArray::new(
928            self.from_fields(fields)?,
929            array
930                .columns()
931                .iter()
932                .zip_eq_fast(fields)
933                .map(|(array, field)| self.from_array(field, array).map(Arc::new))
934                .try_collect()?,
935            (0..array.len()).map(|i| array.is_valid(i)).collect(),
936        )))
937    }
938
939    /// Returns `true` if all expected fields are present in `actual_fields`, and `actual_fields`
940    /// has more fields or has them in a different order.
941    ///
942    /// This is used to decide whether to use `from_struct_array_projected` (projection needed)
943    /// or fall back to the normal `from_struct_array` path (exact match).
944    fn struct_fields_dominated(
945        expected_fields: &arrow_schema::Fields,
946        actual_fields: &arrow_schema::Fields,
947    ) -> bool {
948        // Fast path: if lengths are equal and names match in order, no projection needed
949        if expected_fields.len() == actual_fields.len() {
950            let all_match = expected_fields
951                .iter()
952                .zip_eq_fast(actual_fields.iter())
953                .all(|(e, a)| e.name() == a.name());
954            if all_match {
955                return false; // exact match, use normal path
956            }
957        }
958        // Check that all expected fields exist in actual (by name)
959        let actual_names: std::collections::HashSet<&str> =
960            actual_fields.iter().map(|f| f.name().as_str()).collect();
961        expected_fields
962            .iter()
963            .all(|e| actual_names.contains(e.name().as_str()))
964    }
965
966    /// Converts Arrow `StructArray` to RisingWave `StructArray` according to the expected fields.
967    ///
968    /// This is mainly used for Parquet file source, where the upstream struct may contain extra
969    /// fields. The conversion aligns fields by name, ignores extra fields, and keeps the expected
970    /// field order.
971    fn from_struct_array_projected(
972        &self,
973        expected_fields: &arrow_schema::Fields,
974        array: &arrow_array::StructArray,
975    ) -> Result<ArrayImpl, ArrayError> {
976        use std::collections::HashMap;
977
978        use arrow_array::Array;
979
980        let arrow_schema::DataType::Struct(actual_fields) = array.data_type() else {
981            panic!("nested field types cannot be determined.");
982        };
983
984        let actual_name_to_index: HashMap<&str, usize> = actual_fields
985            .iter()
986            .enumerate()
987            .map(|(idx, f)| (f.name().as_str(), idx))
988            .collect();
989
990        let len = array.len();
991        let projected_columns = expected_fields
992            .iter()
993            .map(|expected_field| {
994                if let Some(&idx) = actual_name_to_index.get(expected_field.name().as_str()) {
995                    let child = array.columns()[idx].clone();
996                    self.from_array(expected_field, &child).map(Arc::new)
997                } else {
998                    // Field missing in Arrow struct. Fill SQL NULL with the expected RW type.
999                    let rw_ty = self.from_field(expected_field)?;
1000                    let mut builder = ArrayBuilderImpl::with_type(len, rw_ty);
1001                    builder.append_n(len, Datum::None);
1002                    Ok(Arc::new(builder.finish()))
1003                }
1004            })
1005            .try_collect()?;
1006
1007        Ok(ArrayImpl::Struct(StructArray::new(
1008            self.from_fields(expected_fields)?,
1009            projected_columns,
1010            (0..len).map(|i| array.is_valid(i)).collect(),
1011        )))
1012    }
1013
1014    fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
1015        use arrow_array::Array;
1016        let struct_array = self.from_struct_array(array.entries())?;
1017        let list_array = ListArray {
1018            value: Box::new(struct_array),
1019            bitmap: match array.nulls() {
1020                Some(nulls) => nulls.iter().collect(),
1021                None => Bitmap::ones(array.len()),
1022            },
1023            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
1024        };
1025
1026        Ok(ArrayImpl::Map(MapArray { inner: list_array }))
1027    }
1028}
1029
1030impl From<&Bitmap> for arrow_buffer::NullBuffer {
1031    fn from(bitmap: &Bitmap) -> Self {
1032        bitmap.iter().collect()
1033    }
1034}
1035
1036/// Implement bi-directional `From` between concrete array types.
1037macro_rules! converts {
1038    ($ArrayType:ty, $ArrowType:ty) => {
1039        impl From<&$ArrayType> for $ArrowType {
1040            fn from(array: &$ArrayType) -> Self {
1041                array.iter().collect()
1042            }
1043        }
1044        impl From<&$ArrowType> for $ArrayType {
1045            fn from(array: &$ArrowType) -> Self {
1046                array.iter().collect()
1047            }
1048        }
1049        impl From<&[$ArrowType]> for $ArrayType {
1050            fn from(arrays: &[$ArrowType]) -> Self {
1051                arrays.iter().flat_map(|a| a.iter()).collect()
1052            }
1053        }
1054    };
1055    // convert values using FromIntoArrow
1056    ($ArrayType:ty, $ArrowType:ty, @map) => {
1057        impl From<&$ArrayType> for $ArrowType {
1058            fn from(array: &$ArrayType) -> Self {
1059                array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
1060            }
1061        }
1062        impl From<&$ArrowType> for $ArrayType {
1063            fn from(array: &$ArrowType) -> Self {
1064                array
1065                    .iter()
1066                    .map(|o| {
1067                        o.map(|v| {
1068                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
1069                        })
1070                    })
1071                    .collect()
1072            }
1073        }
1074        impl From<&[$ArrowType]> for $ArrayType {
1075            fn from(arrays: &[$ArrowType]) -> Self {
1076                arrays
1077                    .iter()
1078                    .flat_map(|a| a.iter())
1079                    .map(|o| {
1080                        o.map(|v| {
1081                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
1082                        })
1083                    })
1084                    .collect()
1085            }
1086        }
1087    };
1088}
1089
1090/// Used to convert different types.
1091macro_rules! converts_with_type {
1092    ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
1093        impl From<&$ArrayType> for $ArrowType {
1094            fn from(array: &$ArrayType) -> Self {
1095                let values: Vec<Option<$ToType>> =
1096                    array.iter().map(|x| x.map(|v| v as $ToType)).collect();
1097                <$ArrowType>::from_iter(values)
1098            }
1099        }
1100
1101        impl From<&$ArrowType> for $ArrayType {
1102            fn from(array: &$ArrowType) -> Self {
1103                let values: Vec<Option<$FromType>> =
1104                    array.iter().map(|x| x.map(|v| v as $FromType)).collect();
1105                <$ArrayType>::from_iter(values)
1106            }
1107        }
1108
1109        impl From<&[$ArrowType]> for $ArrayType {
1110            fn from(arrays: &[$ArrowType]) -> Self {
1111                let values: Vec<Option<$FromType>> = arrays
1112                    .iter()
1113                    .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1114                    .collect();
1115                <$ArrayType>::from_iter(values)
1116            }
1117        }
1118    };
1119}
1120
1121macro_rules! converts_with_timeunit {
1122    ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1123
1124        impl From<&$ArrayType> for $ArrowType {
1125            fn from(array: &$ArrayType) -> Self {
1126                array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1127            }
1128        }
1129
1130        impl From<&$ArrowType> for $ArrayType {
1131            fn from(array: &$ArrowType) -> Self {
1132                array.iter().map(|o| {
1133                    o.map(|v| {
1134                        let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1135                        timestamp
1136                    })
1137                }).collect()
1138            }
1139        }
1140
1141        impl From<&[$ArrowType]> for $ArrayType {
1142            fn from(arrays: &[$ArrowType]) -> Self {
1143                arrays
1144                    .iter()
1145                    .flat_map(|a| a.iter())
1146                    .map(|o| {
1147                        o.map(|v| {
1148                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1149                        })
1150                    })
1151                    .collect()
1152            }
1153        }
1154
1155    };
1156}
1157
1158converts!(BoolArray, arrow_array::BooleanArray);
1159converts!(I16Array, arrow_array::Int16Array);
1160converts!(I32Array, arrow_array::Int32Array);
1161converts!(I64Array, arrow_array::Int64Array);
1162converts!(F32Array, arrow_array::Float32Array, @map);
1163converts!(F64Array, arrow_array::Float64Array, @map);
1164converts!(BytesArray, arrow_array::BinaryArray);
1165converts!(BytesArray, arrow_array::LargeBinaryArray);
1166converts!(Utf8Array, arrow_array::StringArray);
1167converts!(Utf8Array, arrow_array::LargeStringArray);
1168converts!(Utf8Array, arrow_array::StringViewArray);
1169converts!(DateArray, arrow_array::Date32Array, @map);
1170converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1171converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1172converts!(SerialArray, arrow_array::Int64Array, @map);
1173
1174converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1175converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1176converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1177converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1178
1179converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1180converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1181converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1182converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1183
1184converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1185converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1186converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1187converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1188
1189/// Converts RisingWave value from and into Arrow value.
1190trait FromIntoArrow {
1191    /// The corresponding element type in the Arrow array.
1192    type ArrowType;
1193    fn from_arrow(value: Self::ArrowType) -> Self;
1194    fn into_arrow(self) -> Self::ArrowType;
1195}
1196
1197/// Converts RisingWave value from and into Arrow value.
1198/// Specifically used for converting timestamp types according to timeunit.
1199trait FromIntoArrowWithUnit {
1200    type ArrowType;
1201    /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
1202    type TimestampType;
1203    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1204    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1205}
1206
1207impl FromIntoArrow for Serial {
1208    type ArrowType = i64;
1209
1210    fn from_arrow(value: Self::ArrowType) -> Self {
1211        value.into()
1212    }
1213
1214    fn into_arrow(self) -> Self::ArrowType {
1215        self.into()
1216    }
1217}
1218
1219impl FromIntoArrow for F32 {
1220    type ArrowType = f32;
1221
1222    fn from_arrow(value: Self::ArrowType) -> Self {
1223        value.into()
1224    }
1225
1226    fn into_arrow(self) -> Self::ArrowType {
1227        self.into()
1228    }
1229}
1230
1231impl FromIntoArrow for F64 {
1232    type ArrowType = f64;
1233
1234    fn from_arrow(value: Self::ArrowType) -> Self {
1235        value.into()
1236    }
1237
1238    fn into_arrow(self) -> Self::ArrowType {
1239        self.into()
1240    }
1241}
1242
1243impl FromIntoArrow for Date {
1244    type ArrowType = i32;
1245
1246    fn from_arrow(value: Self::ArrowType) -> Self {
1247        Date(arrow_array::types::Date32Type::to_naive_date(value))
1248    }
1249
1250    fn into_arrow(self) -> Self::ArrowType {
1251        arrow_array::types::Date32Type::from_naive_date(self.0)
1252    }
1253}
1254
1255impl FromIntoArrow for Time {
1256    type ArrowType = i64;
1257
1258    fn from_arrow(value: Self::ArrowType) -> Self {
1259        Time(
1260            NaiveTime::from_num_seconds_from_midnight_opt(
1261                (value / 1_000_000) as _,
1262                (value % 1_000_000 * 1000) as _,
1263            )
1264            .unwrap(),
1265        )
1266    }
1267
1268    fn into_arrow(self) -> Self::ArrowType {
1269        self.0
1270            .signed_duration_since(NaiveTime::default())
1271            .num_microseconds()
1272            .unwrap()
1273    }
1274}
1275
1276impl FromIntoArrowWithUnit for Timestamp {
1277    type ArrowType = i64;
1278    type TimestampType = TimeUnit;
1279
1280    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1281        match time_unit {
1282            TimeUnit::Second => {
1283                Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1284            }
1285            TimeUnit::Millisecond => {
1286                Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1287            }
1288            TimeUnit::Microsecond => {
1289                Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1290            }
1291            TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1292        }
1293    }
1294
1295    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1296        match time_unit {
1297            TimeUnit::Second => self.0.and_utc().timestamp(),
1298            TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1299            TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1300            TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1301        }
1302    }
1303}
1304
1305impl FromIntoArrowWithUnit for Timestamptz {
1306    type ArrowType = i64;
1307    type TimestampType = TimeUnit;
1308
1309    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1310        match time_unit {
1311            TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1312            TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1313            TimeUnit::Microsecond => Timestamptz::from_micros(value),
1314            TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1315        }
1316    }
1317
1318    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1319        match time_unit {
1320            TimeUnit::Second => self.timestamp(),
1321            TimeUnit::Millisecond => self.timestamp_millis(),
1322            TimeUnit::Microsecond => self.timestamp_micros(),
1323            TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1324        }
1325    }
1326}
1327
1328impl FromIntoArrow for Interval {
1329    type ArrowType = ArrowIntervalType;
1330
1331    fn from_arrow(value: Self::ArrowType) -> Self {
1332        Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1333    }
1334
1335    fn into_arrow(self) -> Self::ArrowType {
1336        ArrowIntervalType {
1337            months: self.months(),
1338            days: self.days(),
1339            // TODO: this may overflow and we need `try_into`
1340            nanoseconds: self.usecs() * 1000,
1341        }
1342    }
1343}
1344
1345impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1346    fn from(array: &DecimalArray) -> Self {
1347        let mut builder =
1348            arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1349        for value in array.iter() {
1350            builder.append_option(value.map(|d| d.to_string()));
1351        }
1352        builder.finish()
1353    }
1354}
1355
1356impl From<&DecimalArray> for arrow_array::StringArray {
1357    fn from(array: &DecimalArray) -> Self {
1358        let mut builder =
1359            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1360        for value in array.iter() {
1361            builder.append_option(value.map(|d| d.to_string()));
1362        }
1363        builder.finish()
1364    }
1365}
1366
1367// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
1368impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1369    type Error = ArrayError;
1370
1371    fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1372        if array.scale() < 0 {
1373            bail!("support negative scale for arrow decimal")
1374        }
1375
1376        // Calculate the max value based on the Arrow decimal's precision
1377        // When writing Inf to Arrow Decimal128(precision, scale), we use 10^precision - 1
1378        let precision = array.precision();
1379        let max_value = 10_i128.pow(precision as u32) - 1;
1380
1381        let from_arrow = |value| {
1382            const NAN: i128 = i128::MIN + 1;
1383            let res = match value {
1384                // Check for special values using Arrow Decimal's max value, not i128::MAX
1385                NAN => Decimal::NaN,
1386                v if v == max_value => Decimal::PositiveInf,
1387                v if v == -max_value => Decimal::NegativeInf,
1388                i128::MAX => Decimal::PositiveInf, // Fallback for old data
1389                i128::MIN => Decimal::NegativeInf, // Fallback for old data
1390                _ => Decimal::truncated_i128_and_scale(value, array.scale() as u32)
1391                    .ok_or_else(|| ArrayError::from_arrow("decimal overflow"))?,
1392            };
1393            Ok(res)
1394        };
1395        array
1396            .iter()
1397            .map(|o| o.map(from_arrow).transpose())
1398            .collect::<Result<Self, Self::Error>>()
1399    }
1400}
1401
1402// Since RisingWave does not support UInt type, convert UInt64Array to Decimal.
1403impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1404    type Error = ArrayError;
1405
1406    fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1407        let from_arrow = |value| {
1408            // Convert the value to a Decimal with scale 0
1409            let res = Decimal::from(value);
1410            Ok(res)
1411        };
1412
1413        // Map over the array and convert each value
1414        array
1415            .iter()
1416            .map(|o| o.map(from_arrow).transpose())
1417            .collect::<Result<Self, Self::Error>>()
1418    }
1419}
1420
1421impl TryFrom<&arrow_array::Float16Array> for F32Array {
1422    type Error = ArrayError;
1423
1424    fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1425        let from_arrow = |value| Ok(f32::from(value));
1426
1427        array
1428            .iter()
1429            .map(|o| o.map(from_arrow).transpose())
1430            .collect::<Result<Self, Self::Error>>()
1431    }
1432}
1433
1434impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1435    type Error = ArrayError;
1436
1437    fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1438        array
1439            .iter()
1440            .map(|o| {
1441                o.map(|s| {
1442                    let s = std::str::from_utf8(s)
1443                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1444                    s.parse()
1445                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1446                })
1447                .transpose()
1448            })
1449            .try_collect()
1450    }
1451}
1452
1453impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1454    type Error = ArrayError;
1455
1456    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1457        array
1458            .iter()
1459            .map(|o| {
1460                o.map(|s| {
1461                    s.parse()
1462                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1463                })
1464                .transpose()
1465            })
1466            .try_collect()
1467    }
1468}
1469
1470impl From<&JsonbArray> for arrow_array::StringArray {
1471    fn from(array: &JsonbArray) -> Self {
1472        let mut builder =
1473            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1474        for value in array.iter() {
1475            match value {
1476                Some(jsonb) => {
1477                    write!(&mut builder, "{}", jsonb).unwrap();
1478                    builder.append_value("");
1479                }
1480                None => builder.append_null(),
1481            }
1482        }
1483        builder.finish()
1484    }
1485}
1486
1487impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1488    type Error = ArrayError;
1489
1490    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1491        array
1492            .iter()
1493            .map(|o| {
1494                o.map(|s| {
1495                    s.parse()
1496                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1497                })
1498                .transpose()
1499            })
1500            .try_collect()
1501    }
1502}
1503
1504impl From<&IntervalArray> for arrow_array::StringArray {
1505    fn from(array: &IntervalArray) -> Self {
1506        let mut builder =
1507            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1508        for value in array.iter() {
1509            match value {
1510                Some(interval) => {
1511                    write!(&mut builder, "{}", interval).unwrap();
1512                    builder.append_value("");
1513                }
1514                None => builder.append_null(),
1515            }
1516        }
1517        builder.finish()
1518    }
1519}
1520
1521impl From<&JsonbArray> for arrow_array::LargeStringArray {
1522    fn from(array: &JsonbArray) -> Self {
1523        let mut builder =
1524            arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1525        for value in array.iter() {
1526            match value {
1527                Some(jsonb) => {
1528                    write!(&mut builder, "{}", jsonb).unwrap();
1529                    builder.append_value("");
1530                }
1531                None => builder.append_null(),
1532            }
1533        }
1534        builder.finish()
1535    }
1536}
1537
1538impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1539    type Error = ArrayError;
1540
1541    fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1542        array
1543            .iter()
1544            .map(|o| {
1545                o.map(|s| {
1546                    s.parse()
1547                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1548                })
1549                .transpose()
1550            })
1551            .try_collect()
1552    }
1553}
1554
1555impl From<arrow_buffer::i256> for Int256 {
1556    fn from(value: arrow_buffer::i256) -> Self {
1557        let buffer = value.to_be_bytes();
1558        Int256::from_be_bytes(buffer)
1559    }
1560}
1561
1562impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1563    fn from(val: Int256Ref<'a>) -> Self {
1564        let buffer = val.to_be_bytes();
1565        arrow_buffer::i256::from_be_bytes(buffer)
1566    }
1567}
1568
1569impl From<&Int256Array> for arrow_array::Decimal256Array {
1570    fn from(array: &Int256Array) -> Self {
1571        array
1572            .iter()
1573            .map(|o| o.map(arrow_buffer::i256::from))
1574            .collect()
1575    }
1576}
1577
1578impl From<&arrow_array::Decimal256Array> for Int256Array {
1579    fn from(array: &arrow_array::Decimal256Array) -> Self {
1580        let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1581
1582        values
1583            .iter()
1584            .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1585            .collect()
1586    }
1587}
1588
1589/// This function checks whether the schema of a Parquet file matches the user-defined schema in RisingWave.
1590/// It handles the following special cases:
1591/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `Timestamp` type.
1592/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `Timestamptz` type.
1593/// - Since RisingWave does not have an `UInt` type:
1594///   - Arrow's `UInt8` matches with RisingWave's `Int16`.
1595///   - Arrow's `UInt16` matches with RisingWave's `Int32`.
1596///   - Arrow's `UInt32` matches with RisingWave's `Int64`.
1597///   - Arrow's `UInt64` matches with RisingWave's `Decimal`.
1598/// - Arrow's `Float16` matches with RisingWave's `Float32`.
1599///
1600/// Nested data type matching:
1601/// - Struct: Arrow's `Struct` type matches with RisingWave's `Struct` type recursively, requiring that all expected fields exist and match by name and type. Extra Arrow fields are allowed.
1602/// - List: Arrow's `List` type matches with RisingWave's `List` type recursively, requiring the same element type.
1603/// - Map: Arrow's `Map` type matches with RisingWave's `Map` type recursively, requiring the key and value types to match, and the inner struct must have exactly two fields named "key" and "value".
1604pub fn is_parquet_schema_match_source_schema(
1605    arrow_data_type: &arrow_schema::DataType,
1606    rw_data_type: &crate::types::DataType,
1607) -> bool {
1608    use arrow_schema::DataType as ArrowType;
1609
1610    use crate::types::{DataType as RwType, MapType, StructType};
1611
1612    match (arrow_data_type, rw_data_type) {
1613        // Primitive type matching and special cases
1614        (ArrowType::Boolean, RwType::Boolean)
1615        | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1616        | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1617        | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1618        | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1619        | (ArrowType::Decimal256(_, _), RwType::Int256)
1620        | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1621        | (ArrowType::Float64, RwType::Float64)
1622        | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1623        | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1624        | (ArrowType::Date32, RwType::Date)
1625        | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1626        | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1627        | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1628        | (ArrowType::Binary | ArrowType::LargeBinary, RwType::Bytea) => true,
1629
1630        // Struct type recursive matching
1631        // Arrow's Struct matches RisingWave's Struct if all expected field names exist and types
1632        // match recursively. Extra Arrow fields are allowed and field order is ignored.
1633        (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1634            if arrow_fields.len() < rw_struct.len() {
1635                return false;
1636            }
1637            for (rw_name, rw_ty) in rw_struct.iter() {
1638                let Some(arrow_field) = arrow_fields.iter().find(|f| f.name() == rw_name) else {
1639                    return false;
1640                };
1641                if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1642                    return false;
1643                }
1644            }
1645            true
1646        }
1647        // List type recursive matching
1648        // Arrow's List matches RisingWave's List if the element type matches recursively
1649        (ArrowType::List(arrow_field), RwType::List(rw_list_ty)) => {
1650            is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_list_ty.elem())
1651        }
1652        // Map type recursive matching
1653        // Arrow's Map matches RisingWave's Map if the key and value types match recursively,
1654        // and the inner struct has exactly two fields named "key" and "value"
1655        (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1656            if let ArrowType::Struct(fields) = arrow_field.data_type() {
1657                if fields.len() != 2 {
1658                    return false;
1659                }
1660                let key_field = &fields[0];
1661                let value_field = &fields[1];
1662                if key_field.name() != "key" || value_field.name() != "value" {
1663                    return false;
1664                }
1665                let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1666                is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1667                    && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1668            } else {
1669                false
1670            }
1671        }
1672        // Fallback: types do not match
1673        _ => false,
1674    }
1675}
1676#[cfg(test)]
1677mod tests {
1678
1679    use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1680
1681    use super::*;
1682    use crate::types::{DataType as RwType, MapType, StructType};
1683
1684    #[test]
1685    fn test_struct_schema_match() {
1686        // Arrow: struct<f1: Double, f2: Utf8>
1687
1688        let arrow_struct = ArrowType::Struct(
1689            vec![
1690                ArrowField::new("f1", ArrowType::Float64, true),
1691                ArrowField::new("f2", ArrowType::Utf8, true),
1692            ]
1693            .into(),
1694        );
1695        // RW: struct<f1 Double, f2 Varchar>
1696        let rw_struct = RwType::Struct(StructType::new(vec![
1697            ("f1".to_owned(), RwType::Float64),
1698            ("f2".to_owned(), RwType::Varchar),
1699        ]));
1700        assert!(is_parquet_schema_match_source_schema(
1701            &arrow_struct,
1702            &rw_struct
1703        ));
1704
1705        // Arrow is a superset of RW struct fields.
1706        let arrow_struct_superset = ArrowType::Struct(
1707            vec![
1708                ArrowField::new("f1", ArrowType::Float64, true),
1709                ArrowField::new("f2", ArrowType::Utf8, true),
1710                ArrowField::new("f3", ArrowType::Int32, true),
1711            ]
1712            .into(),
1713        );
1714        assert!(is_parquet_schema_match_source_schema(
1715            &arrow_struct_superset,
1716            &rw_struct
1717        ));
1718
1719        // Field order is ignored for struct matching.
1720        let arrow_struct_reordered = ArrowType::Struct(
1721            vec![
1722                ArrowField::new("f2", ArrowType::Utf8, true),
1723                ArrowField::new("f1", ArrowType::Float64, true),
1724            ]
1725            .into(),
1726        );
1727        assert!(is_parquet_schema_match_source_schema(
1728            &arrow_struct_reordered,
1729            &rw_struct
1730        ));
1731
1732        // Field names do not match
1733        let arrow_struct2 = ArrowType::Struct(
1734            vec![
1735                ArrowField::new("f1", ArrowType::Float64, true),
1736                ArrowField::new("f3", ArrowType::Utf8, true),
1737            ]
1738            .into(),
1739        );
1740        assert!(!is_parquet_schema_match_source_schema(
1741            &arrow_struct2,
1742            &rw_struct
1743        ));
1744    }
1745
1746    #[test]
1747    fn test_struct_projection_from_arrow() {
1748        use std::sync::Arc;
1749
1750        use itertools::Itertools;
1751
1752        struct Dummy;
1753        impl FromArrow for Dummy {}
1754
1755        // Actual Arrow struct: struct<foo:int32, bar:utf8, baz:int32>
1756        let actual_fields: arrow_schema::Fields = vec![
1757            ArrowField::new("foo", ArrowType::Int32, true),
1758            ArrowField::new("bar", ArrowType::Utf8, true),
1759            ArrowField::new("baz", ArrowType::Int32, true),
1760        ]
1761        .into();
1762        let foo: arrow_array::ArrayRef =
1763            Arc::new(arrow_array::Int32Array::from(vec![Some(10), Some(20)]));
1764        let bar: arrow_array::ArrayRef =
1765            Arc::new(arrow_array::StringArray::from(vec![Some("a"), Some("b")]));
1766        let baz: arrow_array::ArrayRef =
1767            Arc::new(arrow_array::Int32Array::from(vec![Some(100), Some(200)]));
1768        let actual_struct = arrow_array::StructArray::new(actual_fields, vec![foo, bar, baz], None);
1769        let actual_struct_ref: arrow_array::ArrayRef = Arc::new(actual_struct);
1770
1771        // Expected struct in RW schema (via to_arrow_field): struct<foo:int32, bar:utf8>
1772        let expected_field = ArrowField::new(
1773            "s",
1774            ArrowType::Struct(
1775                vec![
1776                    ArrowField::new("foo", ArrowType::Int32, true),
1777                    ArrowField::new("bar", ArrowType::Utf8, true),
1778                ]
1779                .into(),
1780            ),
1781            true,
1782        );
1783
1784        let array_impl = Dummy
1785            .from_array(&expected_field, &actual_struct_ref)
1786            .unwrap();
1787
1788        let ArrayImpl::Struct(s) = array_impl else {
1789            panic!("expected RW StructArray");
1790        };
1791
1792        let DataType::Struct(st) = s.data_type() else {
1793            panic!("expected RW struct type");
1794        };
1795        assert_eq!(st.len(), 2);
1796        assert_eq!(st.iter().map(|(n, _)| n).collect_vec(), vec!["foo", "bar"]);
1797
1798        let v0 = s.value_at(0).unwrap().to_owned_scalar();
1799        let v1 = s.value_at(1).unwrap().to_owned_scalar();
1800        assert_eq!(
1801            v0,
1802            StructValue::new(vec![
1803                Some(ScalarImpl::Int32(10)),
1804                Some(ScalarImpl::Utf8("a".into()))
1805            ])
1806        );
1807        assert_eq!(
1808            v1,
1809            StructValue::new(vec![
1810                Some(ScalarImpl::Int32(20)),
1811                Some(ScalarImpl::Utf8("b".into()))
1812            ])
1813        );
1814    }
1815
1816    #[test]
1817    fn test_list_schema_match() {
1818        // Arrow: list<double>
1819        let arrow_list =
1820            ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1821        // RW: list<double>
1822        let rw_list = RwType::Float64.list();
1823        assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1824
1825        let rw_list2 = RwType::Int32.list();
1826        assert!(!is_parquet_schema_match_source_schema(
1827            &arrow_list,
1828            &rw_list2
1829        ));
1830    }
1831
1832    #[test]
1833    fn test_map_schema_match() {
1834        // Arrow: map<utf8, int32>
1835        let arrow_map = ArrowType::Map(
1836            Arc::new(ArrowField::new(
1837                "entries",
1838                ArrowType::Struct(
1839                    vec![
1840                        ArrowField::new("key", ArrowType::Utf8, false),
1841                        ArrowField::new("value", ArrowType::Int32, true),
1842                    ]
1843                    .into(),
1844                ),
1845                false,
1846            )),
1847            false,
1848        );
1849        // RW: map<varchar, int32>
1850        let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1851        assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1852
1853        // Key type does not match
1854        let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1855        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1856
1857        // Value type does not match
1858        let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1859        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1860
1861        // Arrow inner struct field name does not match
1862        let arrow_map2 = ArrowType::Map(
1863            Arc::new(ArrowField::new(
1864                "entries",
1865                ArrowType::Struct(
1866                    vec![
1867                        ArrowField::new("k", ArrowType::Utf8, false),
1868                        ArrowField::new("value", ArrowType::Int32, true),
1869                    ]
1870                    .into(),
1871                ),
1872                false,
1873            )),
1874            false,
1875        );
1876        assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1877    }
1878
1879    #[test]
1880    fn bool() {
1881        let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1882        let arrow = arrow_array::BooleanArray::from(&array);
1883        assert_eq!(BoolArray::from(&arrow), array);
1884    }
1885
1886    #[test]
1887    fn i16() {
1888        let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1889        let arrow = arrow_array::Int16Array::from(&array);
1890        assert_eq!(I16Array::from(&arrow), array);
1891    }
1892
1893    #[test]
1894    fn i32() {
1895        let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1896        let arrow = arrow_array::Int32Array::from(&array);
1897        assert_eq!(I32Array::from(&arrow), array);
1898    }
1899
1900    #[test]
1901    fn i64() {
1902        let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1903        let arrow = arrow_array::Int64Array::from(&array);
1904        assert_eq!(I64Array::from(&arrow), array);
1905    }
1906
1907    #[test]
1908    fn f32() {
1909        let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1910        let arrow = arrow_array::Float32Array::from(&array);
1911        assert_eq!(F32Array::from(&arrow), array);
1912    }
1913
1914    #[test]
1915    fn f64() {
1916        let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1917        let arrow = arrow_array::Float64Array::from(&array);
1918        assert_eq!(F64Array::from(&arrow), array);
1919    }
1920
1921    #[test]
1922    fn int8() {
1923        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1924        let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1925        let converted: PrimitiveArray<i16> = (&arr).into();
1926        assert_eq!(converted, array);
1927    }
1928
1929    #[test]
1930    fn uint8() {
1931        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1932        let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1933        let converted: PrimitiveArray<i16> = (&arr).into();
1934        assert_eq!(converted, array);
1935    }
1936
1937    #[test]
1938    fn uint16() {
1939        let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1940        let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1941        let converted: PrimitiveArray<i32> = (&arr).into();
1942        assert_eq!(converted, array);
1943    }
1944
1945    #[test]
1946    fn uint32() {
1947        let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1948        let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1949        let converted: PrimitiveArray<i64> = (&arr).into();
1950        assert_eq!(converted, array);
1951    }
1952
1953    #[test]
1954    fn uint64() {
1955        let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1956            None,
1957            Some(Decimal::Normalized("7".parse().unwrap())),
1958            Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1959        ]);
1960        let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1961        let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1962        assert_eq!(converted, array);
1963    }
1964
1965    #[test]
1966    fn date() {
1967        let array = DateArray::from_iter([
1968            None,
1969            Date::with_days_since_ce(12345).ok(),
1970            Date::with_days_since_ce(-12345).ok(),
1971        ]);
1972        let arrow = arrow_array::Date32Array::from(&array);
1973        assert_eq!(DateArray::from(&arrow), array);
1974    }
1975
1976    #[test]
1977    fn time() {
1978        let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1979        let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1980        assert_eq!(TimeArray::from(&arrow), array);
1981    }
1982
1983    #[test]
1984    fn timestamp() {
1985        let array =
1986            TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1987        let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1988        assert_eq!(TimestampArray::from(&arrow), array);
1989    }
1990
1991    #[test]
1992    fn interval() {
1993        let array = IntervalArray::from_iter([
1994            None,
1995            Some(Interval::from_month_day_usec(
1996                1_000_000,
1997                1_000,
1998                1_000_000_000,
1999            )),
2000            Some(Interval::from_month_day_usec(
2001                -1_000_000,
2002                -1_000,
2003                -1_000_000_000,
2004            )),
2005        ]);
2006        let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
2007        assert_eq!(IntervalArray::from(&arrow), array);
2008    }
2009
2010    #[test]
2011    fn string() {
2012        let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
2013        let arrow = arrow_array::StringArray::from(&array);
2014        assert_eq!(Utf8Array::from(&arrow), array);
2015    }
2016
2017    #[test]
2018    fn binary() {
2019        let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
2020        let arrow = arrow_array::BinaryArray::from(&array);
2021        assert_eq!(BytesArray::from(&arrow), array);
2022    }
2023
2024    #[test]
2025    fn decimal() {
2026        let array = DecimalArray::from_iter([
2027            None,
2028            Some(Decimal::NaN),
2029            Some(Decimal::PositiveInf),
2030            Some(Decimal::NegativeInf),
2031            Some(Decimal::Normalized("123.4".parse().unwrap())),
2032            Some(Decimal::Normalized("123.456".parse().unwrap())),
2033        ]);
2034        let arrow = arrow_array::LargeBinaryArray::from(&array);
2035        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
2036
2037        let arrow = arrow_array::StringArray::from(&array);
2038        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
2039    }
2040
2041    #[test]
2042    fn jsonb() {
2043        let array = JsonbArray::from_iter([
2044            None,
2045            Some("null".parse().unwrap()),
2046            Some("false".parse().unwrap()),
2047            Some("1".parse().unwrap()),
2048            Some("[1, 2, 3]".parse().unwrap()),
2049            Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
2050        ]);
2051        let arrow = arrow_array::LargeStringArray::from(&array);
2052        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
2053
2054        let arrow = arrow_array::StringArray::from(&array);
2055        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
2056    }
2057
2058    #[test]
2059    fn int256() {
2060        let values = [
2061            None,
2062            Some(Int256::from(1)),
2063            Some(Int256::from(i64::MAX)),
2064            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
2065            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
2066            Some(
2067                Int256::from(i64::MAX)
2068                    * Int256::from(i64::MAX)
2069                    * Int256::from(i64::MAX)
2070                    * Int256::from(i64::MAX),
2071            ),
2072            Some(Int256::min_value()),
2073            Some(Int256::max_value()),
2074        ];
2075
2076        let array =
2077            Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
2078        let arrow = arrow_array::Decimal256Array::from(&array);
2079        assert_eq!(Int256Array::from(&arrow), array);
2080    }
2081}