risingwave_common/array/arrow/
arrow_impl.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Converts between arrays and Apache Arrow arrays.
16//!
17//! This file acts as a template file for conversion code between
18//! arrays and different version of Apache Arrow.
19//!
20//! The conversion logic will be implemented for the arrow version specified in the outer mod by
21//! `super::arrow_xxx`, such as `super::arrow_array`.
22//!
23//! When we want to implement the conversion logic for an arrow version, we first
24//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
25//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
26//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
27//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
28//! version.
29//!
30//! Example can be seen in `arrow_default.rs`, which is also as followed:
31//! ```ignore
32//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
33//!
34//! #[allow(clippy::duplicate_mod)]
35//! #[path = "./arrow_impl.rs"]
36//! mod arrow_impl;
37//! ```
38
39// Is this a bug? Why do we have these lints?
40#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53// This is important because we want to use the arrow version specified by the outer mod.
54use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55// Other import should always use the absolute path.
56use crate::array::*;
57use crate::types::{DataType as RwDataType, Scalar, *};
58use crate::util::iter_util::ZipEqFast;
59
60/// Defines how to convert RisingWave arrays to Arrow arrays.
61///
62/// This trait allows for customized conversion logic for different external systems using Arrow.
63/// The default implementation is based on the `From` implemented in this mod.
64pub trait ToArrow {
65    /// Converts RisingWave `DataChunk` to Arrow `RecordBatch` with specified schema.
66    ///
67    /// This function will try to convert the array if the type is not same with the schema.
68    fn to_record_batch(
69        &self,
70        schema: arrow_schema::SchemaRef,
71        chunk: &DataChunk,
72    ) -> Result<arrow_array::RecordBatch, ArrayError> {
73        // compact the chunk if it's not compacted
74        if !chunk.is_vis_compacted() {
75            let c = chunk.clone();
76            return self.to_record_batch(schema, &c.compact_vis());
77        }
78
79        // convert each column to arrow array
80        let columns: Vec<_> = chunk
81            .columns()
82            .iter()
83            .zip_eq_fast(schema.fields().iter())
84            .map(|(column, field)| self.to_array(field.data_type(), column))
85            .try_collect()?;
86
87        // create record batch
88        let opts =
89            arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90        arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91            .map_err(ArrayError::to_arrow)
92    }
93
94    /// Converts RisingWave array to Arrow array.
95    fn to_array(
96        &self,
97        data_type: &arrow_schema::DataType,
98        array: &ArrayImpl,
99    ) -> Result<arrow_array::ArrayRef, ArrayError> {
100        let arrow_array = match array {
101            ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102            ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103            ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104            ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105            ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106            ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107            ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108            ArrayImpl::Date(array) => self.date_to_arrow(array),
109            ArrayImpl::Time(array) => self.time_to_arrow(array),
110            ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111            ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112            ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113            ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114            ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115            ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116            ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117            ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118            ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119            ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121            ArrayImpl::Vector(inner) => self.vector_to_arrow(data_type, inner),
122        }?;
123        if arrow_array.data_type() != data_type {
124            arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
125        } else {
126            Ok(arrow_array)
127        }
128    }
129
130    #[inline]
131    fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
132        Ok(Arc::new(arrow_array::BooleanArray::from(array)))
133    }
134
135    #[inline]
136    fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
137        Ok(Arc::new(arrow_array::Int16Array::from(array)))
138    }
139
140    #[inline]
141    fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
142        Ok(Arc::new(arrow_array::Int32Array::from(array)))
143    }
144
145    #[inline]
146    fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
147        Ok(Arc::new(arrow_array::Int64Array::from(array)))
148    }
149
150    #[inline]
151    fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
152        Ok(Arc::new(arrow_array::Float32Array::from(array)))
153    }
154
155    #[inline]
156    fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
157        Ok(Arc::new(arrow_array::Float64Array::from(array)))
158    }
159
160    #[inline]
161    fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
162        Ok(Arc::new(arrow_array::StringArray::from(array)))
163    }
164
165    #[inline]
166    fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
167        Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
168    }
169
170    #[inline]
171    fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
172        Ok(Arc::new(arrow_array::Date32Array::from(array)))
173    }
174
175    #[inline]
176    fn timestamp_to_arrow(
177        &self,
178        array: &TimestampArray,
179    ) -> Result<arrow_array::ArrayRef, ArrayError> {
180        Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
181            array,
182        )))
183    }
184
185    #[inline]
186    fn timestamptz_to_arrow(
187        &self,
188        array: &TimestamptzArray,
189    ) -> Result<arrow_array::ArrayRef, ArrayError> {
190        Ok(Arc::new(
191            arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
192        ))
193    }
194
195    #[inline]
196    fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
197        Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
198    }
199
200    #[inline]
201    fn interval_to_arrow(
202        &self,
203        array: &IntervalArray,
204    ) -> Result<arrow_array::ArrayRef, ArrayError> {
205        Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
206            array,
207        )))
208    }
209
210    #[inline]
211    fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
212        Ok(Arc::new(arrow_array::BinaryArray::from(array)))
213    }
214
215    // Decimal values are stored as ASCII text representation in a string array.
216    #[inline]
217    fn decimal_to_arrow(
218        &self,
219        _data_type: &arrow_schema::DataType,
220        array: &DecimalArray,
221    ) -> Result<arrow_array::ArrayRef, ArrayError> {
222        Ok(Arc::new(arrow_array::StringArray::from(array)))
223    }
224
225    // JSON values are stored as text representation in a string array.
226    #[inline]
227    fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
228        Ok(Arc::new(arrow_array::StringArray::from(array)))
229    }
230
231    #[inline]
232    fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
233        Ok(Arc::new(arrow_array::Int64Array::from(array)))
234    }
235
236    #[inline]
237    fn list_to_arrow(
238        &self,
239        data_type: &arrow_schema::DataType,
240        array: &ListArray,
241    ) -> Result<arrow_array::ArrayRef, ArrayError> {
242        let arrow_schema::DataType::List(field) = data_type else {
243            return Err(ArrayError::to_arrow("Invalid list type"));
244        };
245        let values = self.to_array(field.data_type(), array.values())?;
246        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
247        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
248        Ok(Arc::new(arrow_array::ListArray::new(
249            field.clone(),
250            offsets,
251            values,
252            nulls,
253        )))
254    }
255
256    #[inline]
257    fn vector_to_arrow(
258        &self,
259        data_type: &arrow_schema::DataType,
260        array: &VectorArray,
261    ) -> Result<arrow_array::ArrayRef, ArrayError> {
262        let arrow_schema::DataType::List(field) = data_type else {
263            return Err(ArrayError::to_arrow("Invalid list type"));
264        };
265        if field.data_type() != &arrow_schema::DataType::Float32 {
266            return Err(ArrayError::to_arrow("Invalid list inner type for vector"));
267        }
268        let values = Arc::new(arrow_array::Float32Array::from(
269            array.as_raw_slice().to_vec(),
270        ));
271        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
272        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
273        Ok(Arc::new(arrow_array::ListArray::new(
274            field.clone(),
275            offsets,
276            values,
277            nulls,
278        )))
279    }
280
281    #[inline]
282    fn struct_to_arrow(
283        &self,
284        data_type: &arrow_schema::DataType,
285        array: &StructArray,
286    ) -> Result<arrow_array::ArrayRef, ArrayError> {
287        let arrow_schema::DataType::Struct(fields) = data_type else {
288            return Err(ArrayError::to_arrow("Invalid struct type"));
289        };
290        // Use `try_new_with_length` so that empty-field structs keep their row count;
291        // `StructArray::new` panics for empty `fields` because it derives length from
292        // the child arrays.
293        let len = array.len();
294        let child_arrays = array
295            .fields()
296            .zip_eq_fast(fields)
297            .map(|(arr, field)| self.to_array(field.data_type(), arr))
298            .try_collect::<_, _, ArrayError>()?;
299        let nulls = Some(array.null_bitmap().into());
300        Ok(Arc::new(
301            arrow_array::StructArray::try_new_with_length(fields.clone(), child_arrays, nulls, len)
302                .map_err(ArrayError::from_arrow)?,
303        ))
304    }
305
306    #[inline]
307    fn map_to_arrow(
308        &self,
309        data_type: &arrow_schema::DataType,
310        array: &MapArray,
311    ) -> Result<arrow_array::ArrayRef, ArrayError> {
312        let arrow_schema::DataType::Map(field, ordered) = data_type else {
313            return Err(ArrayError::to_arrow("Invalid map type"));
314        };
315        if *ordered {
316            return Err(ArrayError::to_arrow("Sorted map is not supported"));
317        }
318        let values = self
319            .struct_to_arrow(field.data_type(), array.as_struct())?
320            .as_struct()
321            .clone();
322        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
323        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
324        Ok(Arc::new(arrow_array::MapArray::new(
325            field.clone(),
326            offsets,
327            values,
328            nulls,
329            *ordered,
330        )))
331    }
332
333    /// Convert RisingWave data type to Arrow data type.
334    ///
335    /// This function returns a `Field` instead of `DataType` because some may be converted to
336    /// extension types which require additional metadata in the field.
337    fn to_arrow_field(
338        &self,
339        name: &str,
340        value: &DataType,
341    ) -> Result<arrow_schema::Field, ArrayError> {
342        let data_type = match value {
343            // using the inline function
344            DataType::Boolean => self.bool_type_to_arrow(),
345            DataType::Int16 => self.int16_type_to_arrow(),
346            DataType::Int32 => self.int32_type_to_arrow(),
347            DataType::Int64 => self.int64_type_to_arrow(),
348            DataType::Int256 => self.int256_type_to_arrow(),
349            DataType::Float32 => self.float32_type_to_arrow(),
350            DataType::Float64 => self.float64_type_to_arrow(),
351            DataType::Date => self.date_type_to_arrow(),
352            DataType::Time => self.time_type_to_arrow(),
353            DataType::Timestamp => self.timestamp_type_to_arrow(),
354            DataType::Timestamptz => self.timestamptz_type_to_arrow(),
355            DataType::Interval => self.interval_type_to_arrow(),
356            DataType::Varchar => self.varchar_type_to_arrow(),
357            DataType::Bytea => self.bytea_type_to_arrow(),
358            DataType::Serial => self.serial_type_to_arrow(),
359            DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
360            DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
361            DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
362            DataType::List(list) => self.list_type_to_arrow(list)?,
363            DataType::Map(map) => self.map_type_to_arrow(map)?,
364            DataType::Vector(_) => self.vector_type_to_arrow()?,
365        };
366        Ok(arrow_schema::Field::new(name, data_type, true))
367    }
368
369    #[inline]
370    fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
371        arrow_schema::DataType::Boolean
372    }
373
374    #[inline]
375    fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
376        arrow_schema::DataType::Int16
377    }
378
379    #[inline]
380    fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
381        arrow_schema::DataType::Int32
382    }
383
384    #[inline]
385    fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
386        arrow_schema::DataType::Int64
387    }
388
389    #[inline]
390    fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
391        arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
392    }
393
394    #[inline]
395    fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
396        arrow_schema::DataType::Float32
397    }
398
399    #[inline]
400    fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
401        arrow_schema::DataType::Float64
402    }
403
404    #[inline]
405    fn date_type_to_arrow(&self) -> arrow_schema::DataType {
406        arrow_schema::DataType::Date32
407    }
408
409    #[inline]
410    fn time_type_to_arrow(&self) -> arrow_schema::DataType {
411        arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
412    }
413
414    #[inline]
415    fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
416        arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
417    }
418
419    #[inline]
420    fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
421        arrow_schema::DataType::Timestamp(
422            arrow_schema::TimeUnit::Microsecond,
423            Some("+00:00".into()),
424        )
425    }
426
427    #[inline]
428    fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
429        arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
430    }
431
432    #[inline]
433    fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
434        arrow_schema::DataType::Utf8
435    }
436
437    #[inline]
438    fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
439        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
440            .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
441    }
442
443    #[inline]
444    fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
445        arrow_schema::DataType::Binary
446    }
447
448    #[inline]
449    fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
450        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
451            .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
452    }
453
454    #[inline]
455    fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
456        arrow_schema::DataType::Int64
457    }
458
459    #[inline]
460    fn list_type_to_arrow(
461        &self,
462        list_type: &ListType,
463    ) -> Result<arrow_schema::DataType, ArrayError> {
464        Ok(arrow_schema::DataType::List(Arc::new(
465            self.to_arrow_field("item", list_type.elem())?,
466        )))
467    }
468
469    #[inline]
470    fn struct_type_to_arrow(
471        &self,
472        fields: &StructType,
473    ) -> Result<arrow_schema::DataType, ArrayError> {
474        Ok(arrow_schema::DataType::Struct(
475            fields
476                .iter()
477                .map(|(name, ty)| self.to_arrow_field(name, ty))
478                .try_collect::<_, _, ArrayError>()?,
479        ))
480    }
481
482    #[inline]
483    fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
484        let sorted = false;
485        // "key" is always non-null
486        let key = self
487            .to_arrow_field("key", map_type.key())?
488            .with_nullable(false);
489        let value = self.to_arrow_field("value", map_type.value())?;
490        Ok(arrow_schema::DataType::Map(
491            Arc::new(arrow_schema::Field::new(
492                "entries",
493                arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
494                // "entries" is always non-null
495                false,
496            )),
497            sorted,
498        ))
499    }
500
501    #[inline]
502    fn vector_type_to_arrow(&self) -> Result<arrow_schema::DataType, ArrayError> {
503        Ok(arrow_schema::DataType::List(Arc::new(
504            self.to_arrow_field("item", &VECTOR_ITEM_TYPE)?,
505        )))
506    }
507}
508
509/// Defines how to convert Arrow arrays to RisingWave arrays.
510#[allow(clippy::wrong_self_convention)]
511pub trait FromArrow {
512    /// Converts Arrow `RecordBatch` to RisingWave `DataChunk`.
513    fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
514        let mut columns = Vec::with_capacity(batch.num_columns());
515        for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
516            let column = Arc::new(self.from_array(field, array)?);
517            columns.push(column);
518        }
519        Ok(DataChunk::new(columns, batch.num_rows()))
520    }
521
522    /// Converts Arrow `Fields` to RisingWave `StructType`.
523    fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
524        Ok(StructType::new(
525            fields
526                .iter()
527                .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
528                .try_collect::<_, Vec<_>, ArrayError>()?,
529        ))
530    }
531
532    /// Converts Arrow `Field` to RisingWave `DataType`.
533    fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
534        use arrow_schema::DataType::*;
535        use arrow_schema::IntervalUnit::*;
536        use arrow_schema::TimeUnit::*;
537
538        // extension type
539        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
540            return self.from_extension_type(type_name, field.data_type());
541        }
542
543        Ok(match field.data_type() {
544            Boolean => DataType::Boolean,
545            Int16 => DataType::Int16,
546            Int32 => DataType::Int32,
547            Int64 => DataType::Int64,
548            Int8 => DataType::Int16,
549            UInt8 => DataType::Int16,
550            UInt16 => DataType::Int32,
551            UInt32 => DataType::Int64,
552            UInt64 => DataType::Decimal,
553            Float16 => DataType::Float32,
554            Float32 => DataType::Float32,
555            Float64 => DataType::Float64,
556            Decimal128(_, _) => DataType::Decimal,
557            Decimal256(_, _) => DataType::Int256,
558            Date32 => DataType::Date,
559            Time64(Microsecond) => DataType::Time,
560            Timestamp(Microsecond, None) => DataType::Timestamp,
561            Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
562            Timestamp(Second, None) => DataType::Timestamp,
563            Timestamp(Second, Some(_)) => DataType::Timestamptz,
564            Timestamp(Millisecond, None) => DataType::Timestamp,
565            Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
566            Timestamp(Nanosecond, None) => DataType::Timestamp,
567            Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
568            Interval(MonthDayNano) => DataType::Interval,
569            Utf8 => DataType::Varchar,
570            Utf8View => DataType::Varchar,
571            Binary => DataType::Bytea,
572            // Iceberg `uuid` maps to `FixedSizeBinary(16)` and `fixed[L]` maps to
573            // `FixedSizeBinary(L)`. Both are represented as `Bytea` in RisingWave.
574            FixedSizeBinary(_) => self.from_fixed_size_binary()?,
575            LargeUtf8 => self.from_large_utf8()?,
576            LargeBinary => self.from_large_binary()?,
577            List(field) => DataType::list(self.from_field(field)?),
578            Struct(fields) => DataType::Struct(self.from_fields(fields)?),
579            Map(field, _is_sorted) => {
580                let entries = self.from_field(field)?;
581                DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
582                    ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
583                })?)
584            }
585            t => {
586                return Err(ArrayError::from_arrow(format!(
587                    "unsupported arrow data type: {t:?}"
588                )));
589            }
590        })
591    }
592
593    /// Converts Arrow `LargeUtf8` type to RisingWave data type.
594    fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
595        Ok(DataType::Varchar)
596    }
597
598    /// Converts Arrow `LargeBinary` type to RisingWave data type.
599    fn from_large_binary(&self) -> Result<DataType, ArrayError> {
600        Ok(DataType::Bytea)
601    }
602
603    /// Converts Arrow `FixedSizeBinary` type to RisingWave data type.
604    fn from_fixed_size_binary(&self) -> Result<DataType, ArrayError> {
605        Ok(DataType::Bytea)
606    }
607
608    /// Converts Arrow extension type to RisingWave `DataType`.
609    fn from_extension_type(
610        &self,
611        type_name: &str,
612        physical_type: &arrow_schema::DataType,
613    ) -> Result<DataType, ArrayError> {
614        match (type_name, physical_type) {
615            ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
616            ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
617            _ => Err(ArrayError::from_arrow(format!(
618                "unsupported extension type: {type_name:?}"
619            ))),
620        }
621    }
622
623    /// Converts Arrow `Array` to RisingWave `ArrayImpl`.
624    fn from_array(
625        &self,
626        field: &arrow_schema::Field,
627        array: &arrow_array::ArrayRef,
628    ) -> Result<ArrayImpl, ArrayError> {
629        use arrow_schema::DataType::*;
630        use arrow_schema::IntervalUnit::*;
631        use arrow_schema::TimeUnit::*;
632
633        // extension type
634        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
635            return self.from_extension_array(type_name, array);
636        }
637
638        // Struct projection for file source (Parquet): allow Arrow struct to be a superset of the
639        // expected struct fields. We align fields by name and ignore extra fields.
640        //
641        // Only use projection when Arrow struct differs from expected (superset, reordered, or
642        // different field names). If they match exactly, fall through to the normal path to
643        // avoid unnecessary overhead and potential issues with UDF/other paths.
644        if let (
645            arrow_schema::DataType::Struct(expected_fields),
646            arrow_schema::DataType::Struct(actual_fields),
647        ) = (field.data_type(), array.data_type())
648        {
649            let dominated = Self::struct_fields_dominated(expected_fields, actual_fields);
650            if dominated {
651                let struct_array: &arrow_array::StructArray =
652                    array.as_any().downcast_ref().unwrap();
653                return self.from_struct_array_projected(expected_fields, struct_array);
654            }
655            // else: fields match exactly, fall through to normal Struct(_) path below
656        }
657        match array.data_type() {
658            Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
659            Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
660            Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
661            Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
662            Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
663            UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
664            UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
665            UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
666
667            UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
668            Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
669            Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
670            Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
671            Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
672            Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
673            Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
674            Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
675            Timestamp(Second, None) => {
676                self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
677            }
678            Timestamp(Second, Some(_)) => {
679                self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
680            }
681            Timestamp(Millisecond, None) => {
682                self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
683            }
684            Timestamp(Millisecond, Some(_)) => {
685                self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
686            }
687            Timestamp(Microsecond, None) => {
688                self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
689            }
690            Timestamp(Microsecond, Some(_)) => {
691                self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
692            }
693            Timestamp(Nanosecond, None) => {
694                self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
695            }
696            Timestamp(Nanosecond, Some(_)) => {
697                self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
698            }
699            Interval(MonthDayNano) => {
700                self.from_interval_array(array.as_any().downcast_ref().unwrap())
701            }
702            Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
703            Utf8View => self.from_utf8_view_array(array.as_any().downcast_ref().unwrap()),
704            Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
705            FixedSizeBinary(_) => {
706                self.from_fixed_size_binary_array(array.as_any().downcast_ref().unwrap())
707            }
708            LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
709            LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
710            List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
711            Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
712            Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
713            t => Err(ArrayError::from_arrow(format!(
714                "unsupported arrow data type: {t:?}",
715            ))),
716        }
717    }
718
719    /// Converts Arrow extension array to RisingWave `ArrayImpl`.
720    fn from_extension_array(
721        &self,
722        type_name: &str,
723        array: &arrow_array::ArrayRef,
724    ) -> Result<ArrayImpl, ArrayError> {
725        match type_name {
726            "arrowudf.decimal" => {
727                let array: &arrow_array::StringArray =
728                    array.as_any().downcast_ref().ok_or_else(|| {
729                        ArrayError::from_arrow(
730                            "expected string array for `arrowudf.decimal`".to_owned(),
731                        )
732                    })?;
733                Ok(ArrayImpl::Decimal(array.try_into()?))
734            }
735            "arrowudf.json" => {
736                let array: &arrow_array::StringArray =
737                    array.as_any().downcast_ref().ok_or_else(|| {
738                        ArrayError::from_arrow(
739                            "expected string array for `arrowudf.json`".to_owned(),
740                        )
741                    })?;
742                Ok(ArrayImpl::Jsonb(array.try_into()?))
743            }
744            _ => Err(ArrayError::from_arrow(format!(
745                "unsupported extension type: {type_name:?}"
746            ))),
747        }
748    }
749
750    fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
751        Ok(ArrayImpl::Bool(array.into()))
752    }
753
754    fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
755        Ok(ArrayImpl::Int16(array.into()))
756    }
757
758    fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
759        Ok(ArrayImpl::Int16(array.into()))
760    }
761
762    fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
763        Ok(ArrayImpl::Int16(array.into()))
764    }
765
766    fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
767        Ok(ArrayImpl::Int32(array.into()))
768    }
769
770    fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
771        Ok(ArrayImpl::Int64(array.into()))
772    }
773
774    fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
775        Ok(ArrayImpl::Int32(array.into()))
776    }
777
778    fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
779        Ok(ArrayImpl::Int64(array.into()))
780    }
781
782    fn from_int256_array(
783        &self,
784        array: &arrow_array::Decimal256Array,
785    ) -> Result<ArrayImpl, ArrayError> {
786        Ok(ArrayImpl::Int256(array.into()))
787    }
788
789    fn from_decimal128_array(
790        &self,
791        array: &arrow_array::Decimal128Array,
792    ) -> Result<ArrayImpl, ArrayError> {
793        Ok(ArrayImpl::Decimal(array.try_into()?))
794    }
795
796    fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
797        Ok(ArrayImpl::Decimal(array.try_into()?))
798    }
799
800    fn from_float16_array(
801        &self,
802        array: &arrow_array::Float16Array,
803    ) -> Result<ArrayImpl, ArrayError> {
804        Ok(ArrayImpl::Float32(array.try_into()?))
805    }
806
807    fn from_float32_array(
808        &self,
809        array: &arrow_array::Float32Array,
810    ) -> Result<ArrayImpl, ArrayError> {
811        Ok(ArrayImpl::Float32(array.into()))
812    }
813
814    fn from_float64_array(
815        &self,
816        array: &arrow_array::Float64Array,
817    ) -> Result<ArrayImpl, ArrayError> {
818        Ok(ArrayImpl::Float64(array.into()))
819    }
820
821    fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
822        Ok(ArrayImpl::Date(array.into()))
823    }
824
825    fn from_time64us_array(
826        &self,
827        array: &arrow_array::Time64MicrosecondArray,
828    ) -> Result<ArrayImpl, ArrayError> {
829        Ok(ArrayImpl::Time(array.into()))
830    }
831
832    fn from_timestampsecond_array(
833        &self,
834        array: &arrow_array::TimestampSecondArray,
835    ) -> Result<ArrayImpl, ArrayError> {
836        Ok(ArrayImpl::Timestamp(array.into()))
837    }
838    fn from_timestampsecond_some_array(
839        &self,
840        array: &arrow_array::TimestampSecondArray,
841    ) -> Result<ArrayImpl, ArrayError> {
842        Ok(ArrayImpl::Timestamptz(array.into()))
843    }
844
845    fn from_timestampms_array(
846        &self,
847        array: &arrow_array::TimestampMillisecondArray,
848    ) -> Result<ArrayImpl, ArrayError> {
849        Ok(ArrayImpl::Timestamp(array.into()))
850    }
851
852    fn from_timestampms_some_array(
853        &self,
854        array: &arrow_array::TimestampMillisecondArray,
855    ) -> Result<ArrayImpl, ArrayError> {
856        Ok(ArrayImpl::Timestamptz(array.into()))
857    }
858
859    fn from_timestampus_array(
860        &self,
861        array: &arrow_array::TimestampMicrosecondArray,
862    ) -> Result<ArrayImpl, ArrayError> {
863        Ok(ArrayImpl::Timestamp(array.into()))
864    }
865
866    fn from_timestampus_some_array(
867        &self,
868        array: &arrow_array::TimestampMicrosecondArray,
869    ) -> Result<ArrayImpl, ArrayError> {
870        Ok(ArrayImpl::Timestamptz(array.into()))
871    }
872
873    fn from_timestampns_array(
874        &self,
875        array: &arrow_array::TimestampNanosecondArray,
876    ) -> Result<ArrayImpl, ArrayError> {
877        Ok(ArrayImpl::Timestamp(array.into()))
878    }
879
880    fn from_timestampns_some_array(
881        &self,
882        array: &arrow_array::TimestampNanosecondArray,
883    ) -> Result<ArrayImpl, ArrayError> {
884        Ok(ArrayImpl::Timestamptz(array.into()))
885    }
886
887    fn from_interval_array(
888        &self,
889        array: &arrow_array::IntervalMonthDayNanoArray,
890    ) -> Result<ArrayImpl, ArrayError> {
891        Ok(ArrayImpl::Interval(array.into()))
892    }
893
894    fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
895        Ok(ArrayImpl::Utf8(array.into()))
896    }
897
898    fn from_utf8_view_array(
899        &self,
900        array: &arrow_array::StringViewArray,
901    ) -> Result<ArrayImpl, ArrayError> {
902        Ok(ArrayImpl::Utf8(array.into()))
903    }
904
905    fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
906        Ok(ArrayImpl::Bytea(array.into()))
907    }
908
909    fn from_large_utf8_array(
910        &self,
911        array: &arrow_array::LargeStringArray,
912    ) -> Result<ArrayImpl, ArrayError> {
913        Ok(ArrayImpl::Utf8(array.into()))
914    }
915
916    fn from_large_binary_array(
917        &self,
918        array: &arrow_array::LargeBinaryArray,
919    ) -> Result<ArrayImpl, ArrayError> {
920        Ok(ArrayImpl::Bytea(array.into()))
921    }
922
923    fn from_fixed_size_binary_array(
924        &self,
925        array: &arrow_array::FixedSizeBinaryArray,
926    ) -> Result<ArrayImpl, ArrayError> {
927        Ok(ArrayImpl::Bytea(array.iter().collect()))
928    }
929
930    fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
931        use arrow_array::Array;
932        let arrow_schema::DataType::List(field) = array.data_type() else {
933            panic!("nested field types cannot be determined.");
934        };
935        Ok(ArrayImpl::List(ListArray {
936            value: Box::new(self.from_array(field, array.values())?),
937            bitmap: match array.nulls() {
938                Some(nulls) => nulls.iter().collect(),
939                None => Bitmap::ones(array.len()),
940            },
941            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
942        }))
943    }
944
945    fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
946        use arrow_array::Array;
947        let arrow_schema::DataType::Struct(fields) = array.data_type() else {
948            panic!("nested field types cannot be determined.");
949        };
950        Ok(ArrayImpl::Struct(StructArray::new(
951            self.from_fields(fields)?,
952            array
953                .columns()
954                .iter()
955                .zip_eq_fast(fields)
956                .map(|(array, field)| self.from_array(field, array).map(Arc::new))
957                .try_collect()?,
958            (0..array.len()).map(|i| array.is_valid(i)).collect(),
959        )))
960    }
961
962    /// Returns `true` if all expected fields are present in `actual_fields`, and `actual_fields`
963    /// has more fields or has them in a different order.
964    ///
965    /// This is used to decide whether to use `from_struct_array_projected` (projection needed)
966    /// or fall back to the normal `from_struct_array` path (exact match).
967    fn struct_fields_dominated(
968        expected_fields: &arrow_schema::Fields,
969        actual_fields: &arrow_schema::Fields,
970    ) -> bool {
971        // Fast path: if lengths are equal and names match in order, no projection needed
972        if expected_fields.len() == actual_fields.len() {
973            let all_match = expected_fields
974                .iter()
975                .zip_eq_fast(actual_fields.iter())
976                .all(|(e, a)| e.name() == a.name());
977            if all_match {
978                return false; // exact match, use normal path
979            }
980        }
981        // Check that all expected fields exist in actual (by name)
982        let actual_names: std::collections::HashSet<&str> =
983            actual_fields.iter().map(|f| f.name().as_str()).collect();
984        expected_fields
985            .iter()
986            .all(|e| actual_names.contains(e.name().as_str()))
987    }
988
989    /// Converts Arrow `StructArray` to RisingWave `StructArray` according to the expected fields.
990    ///
991    /// This is mainly used for Parquet file source, where the upstream struct may contain extra
992    /// fields. The conversion aligns fields by name, ignores extra fields, and keeps the expected
993    /// field order.
994    fn from_struct_array_projected(
995        &self,
996        expected_fields: &arrow_schema::Fields,
997        array: &arrow_array::StructArray,
998    ) -> Result<ArrayImpl, ArrayError> {
999        use std::collections::HashMap;
1000
1001        use arrow_array::Array;
1002
1003        let arrow_schema::DataType::Struct(actual_fields) = array.data_type() else {
1004            panic!("nested field types cannot be determined.");
1005        };
1006
1007        let actual_name_to_index: HashMap<&str, usize> = actual_fields
1008            .iter()
1009            .enumerate()
1010            .map(|(idx, f)| (f.name().as_str(), idx))
1011            .collect();
1012
1013        let len = array.len();
1014        let projected_columns = expected_fields
1015            .iter()
1016            .map(|expected_field| {
1017                if let Some(&idx) = actual_name_to_index.get(expected_field.name().as_str()) {
1018                    let child = array.columns()[idx].clone();
1019                    self.from_array(expected_field, &child).map(Arc::new)
1020                } else {
1021                    // Field missing in Arrow struct. Fill SQL NULL with the expected RW type.
1022                    let rw_ty = self.from_field(expected_field)?;
1023                    let mut builder = ArrayBuilderImpl::with_type(len, rw_ty);
1024                    builder.append_n(len, Datum::None);
1025                    Ok(Arc::new(builder.finish()))
1026                }
1027            })
1028            .try_collect()?;
1029
1030        Ok(ArrayImpl::Struct(StructArray::new(
1031            self.from_fields(expected_fields)?,
1032            projected_columns,
1033            (0..len).map(|i| array.is_valid(i)).collect(),
1034        )))
1035    }
1036
1037    fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
1038        use arrow_array::Array;
1039        let struct_array = self.from_struct_array(array.entries())?;
1040        let list_array = ListArray {
1041            value: Box::new(struct_array),
1042            bitmap: match array.nulls() {
1043                Some(nulls) => nulls.iter().collect(),
1044                None => Bitmap::ones(array.len()),
1045            },
1046            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
1047        };
1048
1049        Ok(ArrayImpl::Map(MapArray { inner: list_array }))
1050    }
1051}
1052
1053impl From<&Bitmap> for arrow_buffer::NullBuffer {
1054    fn from(bitmap: &Bitmap) -> Self {
1055        bitmap.iter().collect()
1056    }
1057}
1058
1059/// Implement bi-directional `From` between concrete array types.
1060macro_rules! converts {
1061    ($ArrayType:ty, $ArrowType:ty) => {
1062        impl From<&$ArrayType> for $ArrowType {
1063            fn from(array: &$ArrayType) -> Self {
1064                array.iter().collect()
1065            }
1066        }
1067        impl From<&$ArrowType> for $ArrayType {
1068            fn from(array: &$ArrowType) -> Self {
1069                array.iter().collect()
1070            }
1071        }
1072        impl From<&[$ArrowType]> for $ArrayType {
1073            fn from(arrays: &[$ArrowType]) -> Self {
1074                arrays.iter().flat_map(|a| a.iter()).collect()
1075            }
1076        }
1077    };
1078    // convert values using FromIntoArrow
1079    ($ArrayType:ty, $ArrowType:ty, @map) => {
1080        impl From<&$ArrayType> for $ArrowType {
1081            fn from(array: &$ArrayType) -> Self {
1082                array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
1083            }
1084        }
1085        impl From<&$ArrowType> for $ArrayType {
1086            fn from(array: &$ArrowType) -> Self {
1087                array
1088                    .iter()
1089                    .map(|o| {
1090                        o.map(|v| {
1091                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
1092                        })
1093                    })
1094                    .collect()
1095            }
1096        }
1097        impl From<&[$ArrowType]> for $ArrayType {
1098            fn from(arrays: &[$ArrowType]) -> Self {
1099                arrays
1100                    .iter()
1101                    .flat_map(|a| a.iter())
1102                    .map(|o| {
1103                        o.map(|v| {
1104                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
1105                        })
1106                    })
1107                    .collect()
1108            }
1109        }
1110    };
1111}
1112
1113/// Used to convert different types.
1114macro_rules! converts_with_type {
1115    ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
1116        impl From<&$ArrayType> for $ArrowType {
1117            fn from(array: &$ArrayType) -> Self {
1118                let values: Vec<Option<$ToType>> =
1119                    array.iter().map(|x| x.map(|v| v as $ToType)).collect();
1120                <$ArrowType>::from_iter(values)
1121            }
1122        }
1123
1124        impl From<&$ArrowType> for $ArrayType {
1125            fn from(array: &$ArrowType) -> Self {
1126                let values: Vec<Option<$FromType>> =
1127                    array.iter().map(|x| x.map(|v| v as $FromType)).collect();
1128                <$ArrayType>::from_iter(values)
1129            }
1130        }
1131
1132        impl From<&[$ArrowType]> for $ArrayType {
1133            fn from(arrays: &[$ArrowType]) -> Self {
1134                let values: Vec<Option<$FromType>> = arrays
1135                    .iter()
1136                    .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
1137                    .collect();
1138                <$ArrayType>::from_iter(values)
1139            }
1140        }
1141    };
1142}
1143
1144macro_rules! converts_with_timeunit {
1145    ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
1146
1147        impl From<&$ArrayType> for $ArrowType {
1148            fn from(array: &$ArrayType) -> Self {
1149                array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
1150            }
1151        }
1152
1153        impl From<&$ArrowType> for $ArrayType {
1154            fn from(array: &$ArrowType) -> Self {
1155                array.iter().map(|o| {
1156                    o.map(|v| {
1157                        let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
1158                        timestamp
1159                    })
1160                }).collect()
1161            }
1162        }
1163
1164        impl From<&[$ArrowType]> for $ArrayType {
1165            fn from(arrays: &[$ArrowType]) -> Self {
1166                arrays
1167                    .iter()
1168                    .flat_map(|a| a.iter())
1169                    .map(|o| {
1170                        o.map(|v| {
1171                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1172                        })
1173                    })
1174                    .collect()
1175            }
1176        }
1177
1178    };
1179}
1180
1181converts!(BoolArray, arrow_array::BooleanArray);
1182converts!(I16Array, arrow_array::Int16Array);
1183converts!(I32Array, arrow_array::Int32Array);
1184converts!(I64Array, arrow_array::Int64Array);
1185converts!(F32Array, arrow_array::Float32Array, @map);
1186converts!(F64Array, arrow_array::Float64Array, @map);
1187converts!(BytesArray, arrow_array::BinaryArray);
1188converts!(BytesArray, arrow_array::LargeBinaryArray);
1189converts!(Utf8Array, arrow_array::StringArray);
1190converts!(Utf8Array, arrow_array::LargeStringArray);
1191converts!(Utf8Array, arrow_array::StringViewArray);
1192converts!(DateArray, arrow_array::Date32Array, @map);
1193converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1194converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1195converts!(SerialArray, arrow_array::Int64Array, @map);
1196
1197converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1198converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1199converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1200converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1201
1202converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1203converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1204converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1205converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1206
1207converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1208converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1209converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1210converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1211
1212/// Converts RisingWave value from and into Arrow value.
1213trait FromIntoArrow {
1214    /// The corresponding element type in the Arrow array.
1215    type ArrowType;
1216    fn from_arrow(value: Self::ArrowType) -> Self;
1217    fn into_arrow(self) -> Self::ArrowType;
1218}
1219
1220/// Converts RisingWave value from and into Arrow value.
1221/// Specifically used for converting timestamp types according to timeunit.
1222trait FromIntoArrowWithUnit {
1223    type ArrowType;
1224    /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
1225    type TimestampType;
1226    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1227    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1228}
1229
1230impl FromIntoArrow for Serial {
1231    type ArrowType = i64;
1232
1233    fn from_arrow(value: Self::ArrowType) -> Self {
1234        value.into()
1235    }
1236
1237    fn into_arrow(self) -> Self::ArrowType {
1238        self.into()
1239    }
1240}
1241
1242impl FromIntoArrow for F32 {
1243    type ArrowType = f32;
1244
1245    fn from_arrow(value: Self::ArrowType) -> Self {
1246        value.into()
1247    }
1248
1249    fn into_arrow(self) -> Self::ArrowType {
1250        self.into()
1251    }
1252}
1253
1254impl FromIntoArrow for F64 {
1255    type ArrowType = f64;
1256
1257    fn from_arrow(value: Self::ArrowType) -> Self {
1258        value.into()
1259    }
1260
1261    fn into_arrow(self) -> Self::ArrowType {
1262        self.into()
1263    }
1264}
1265
1266impl FromIntoArrow for Date {
1267    type ArrowType = i32;
1268
1269    #[allow(deprecated)]
1270    fn from_arrow(value: Self::ArrowType) -> Self {
1271        Date(arrow_array::types::Date32Type::to_naive_date(value))
1272    }
1273
1274    fn into_arrow(self) -> Self::ArrowType {
1275        arrow_array::types::Date32Type::from_naive_date(self.0)
1276    }
1277}
1278
1279impl FromIntoArrow for Time {
1280    type ArrowType = i64;
1281
1282    fn from_arrow(value: Self::ArrowType) -> Self {
1283        Time(
1284            NaiveTime::from_num_seconds_from_midnight_opt(
1285                (value / 1_000_000) as _,
1286                (value % 1_000_000 * 1000) as _,
1287            )
1288            .unwrap(),
1289        )
1290    }
1291
1292    fn into_arrow(self) -> Self::ArrowType {
1293        self.0
1294            .signed_duration_since(NaiveTime::default())
1295            .num_microseconds()
1296            .unwrap()
1297    }
1298}
1299
1300impl FromIntoArrowWithUnit for Timestamp {
1301    type ArrowType = i64;
1302    type TimestampType = TimeUnit;
1303
1304    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1305        match time_unit {
1306            TimeUnit::Second => {
1307                Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1308            }
1309            TimeUnit::Millisecond => {
1310                Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1311            }
1312            TimeUnit::Microsecond => {
1313                Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1314            }
1315            TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1316        }
1317    }
1318
1319    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1320        match time_unit {
1321            TimeUnit::Second => self.0.and_utc().timestamp(),
1322            TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1323            TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1324            TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1325        }
1326    }
1327}
1328
1329impl FromIntoArrowWithUnit for Timestamptz {
1330    type ArrowType = i64;
1331    type TimestampType = TimeUnit;
1332
1333    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1334        match time_unit {
1335            TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1336            TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1337            TimeUnit::Microsecond => Timestamptz::from_micros(value),
1338            TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1339        }
1340    }
1341
1342    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1343        match time_unit {
1344            TimeUnit::Second => self.timestamp(),
1345            TimeUnit::Millisecond => self.timestamp_millis(),
1346            TimeUnit::Microsecond => self.timestamp_micros(),
1347            TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1348        }
1349    }
1350}
1351
1352impl FromIntoArrow for Interval {
1353    type ArrowType = ArrowIntervalType;
1354
1355    fn from_arrow(value: Self::ArrowType) -> Self {
1356        Interval::from_month_day_usec(value.months, value.days, value.nanoseconds / 1000)
1357    }
1358
1359    fn into_arrow(self) -> Self::ArrowType {
1360        ArrowIntervalType {
1361            months: self.months(),
1362            days: self.days(),
1363            // TODO: this may overflow and we need `try_into`
1364            nanoseconds: self.usecs() * 1000,
1365        }
1366    }
1367}
1368
1369impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1370    fn from(array: &DecimalArray) -> Self {
1371        let mut builder =
1372            arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1373        for value in array.iter() {
1374            builder.append_option(value.map(|d| d.to_string()));
1375        }
1376        builder.finish()
1377    }
1378}
1379
1380impl From<&DecimalArray> for arrow_array::StringArray {
1381    fn from(array: &DecimalArray) -> Self {
1382        let mut builder =
1383            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1384        for value in array.iter() {
1385            builder.append_option(value.map(|d| d.to_string()));
1386        }
1387        builder.finish()
1388    }
1389}
1390
1391// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
1392impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1393    type Error = ArrayError;
1394
1395    fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1396        if array.scale() < 0 {
1397            bail!("support negative scale for arrow decimal")
1398        }
1399
1400        // Calculate the max value based on the Arrow decimal's precision
1401        // When writing Inf to Arrow Decimal128(precision, scale), we use 10^precision - 1
1402        let precision = array.precision();
1403        let max_value = 10_i128.pow(precision as u32) - 1;
1404
1405        let from_arrow = |value| {
1406            const NAN: i128 = i128::MIN + 1;
1407            let res = match value {
1408                // Check for special values using Arrow Decimal's max value, not i128::MAX
1409                NAN => Decimal::NaN,
1410                v if v == max_value => Decimal::PositiveInf,
1411                v if v == -max_value => Decimal::NegativeInf,
1412                i128::MAX => Decimal::PositiveInf, // Fallback for old data
1413                i128::MIN => Decimal::NegativeInf, // Fallback for old data
1414                _ => Decimal::truncated_i128_and_scale(value, array.scale() as u32)
1415                    .ok_or_else(|| ArrayError::from_arrow("decimal overflow"))?,
1416            };
1417            Ok(res)
1418        };
1419        array
1420            .iter()
1421            .map(|o| o.map(from_arrow).transpose())
1422            .collect::<Result<Self, Self::Error>>()
1423    }
1424}
1425
1426// Since RisingWave does not support UInt type, convert UInt64Array to Decimal.
1427impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1428    type Error = ArrayError;
1429
1430    fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1431        let from_arrow = |value| {
1432            // Convert the value to a Decimal with scale 0
1433            let res = Decimal::from(value);
1434            Ok(res)
1435        };
1436
1437        // Map over the array and convert each value
1438        array
1439            .iter()
1440            .map(|o| o.map(from_arrow).transpose())
1441            .collect::<Result<Self, Self::Error>>()
1442    }
1443}
1444
1445impl TryFrom<&arrow_array::Float16Array> for F32Array {
1446    type Error = ArrayError;
1447
1448    fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1449        let from_arrow = |value| Ok(f32::from(value));
1450
1451        array
1452            .iter()
1453            .map(|o| o.map(from_arrow).transpose())
1454            .collect::<Result<Self, Self::Error>>()
1455    }
1456}
1457
1458impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1459    type Error = ArrayError;
1460
1461    fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1462        array
1463            .iter()
1464            .map(|o| {
1465                o.map(|s| {
1466                    let s = std::str::from_utf8(s)
1467                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1468                    s.parse()
1469                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1470                })
1471                .transpose()
1472            })
1473            .try_collect()
1474    }
1475}
1476
1477impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1478    type Error = ArrayError;
1479
1480    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1481        array
1482            .iter()
1483            .map(|o| {
1484                o.map(|s| {
1485                    s.parse()
1486                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1487                })
1488                .transpose()
1489            })
1490            .try_collect()
1491    }
1492}
1493
1494impl From<&JsonbArray> for arrow_array::StringArray {
1495    fn from(array: &JsonbArray) -> Self {
1496        let mut builder =
1497            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1498        for value in array.iter() {
1499            match value {
1500                Some(jsonb) => {
1501                    write!(&mut builder, "{}", jsonb).unwrap();
1502                    builder.append_value("");
1503                }
1504                None => builder.append_null(),
1505            }
1506        }
1507        builder.finish()
1508    }
1509}
1510
1511impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1512    type Error = ArrayError;
1513
1514    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1515        array
1516            .iter()
1517            .map(|o| {
1518                o.map(|s| {
1519                    s.parse()
1520                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1521                })
1522                .transpose()
1523            })
1524            .try_collect()
1525    }
1526}
1527
1528impl From<&IntervalArray> for arrow_array::StringArray {
1529    fn from(array: &IntervalArray) -> Self {
1530        let mut builder =
1531            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1532        for value in array.iter() {
1533            match value {
1534                Some(interval) => {
1535                    write!(&mut builder, "{}", interval).unwrap();
1536                    builder.append_value("");
1537                }
1538                None => builder.append_null(),
1539            }
1540        }
1541        builder.finish()
1542    }
1543}
1544
1545impl From<&JsonbArray> for arrow_array::LargeStringArray {
1546    fn from(array: &JsonbArray) -> Self {
1547        let mut builder =
1548            arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1549        for value in array.iter() {
1550            match value {
1551                Some(jsonb) => {
1552                    write!(&mut builder, "{}", jsonb).unwrap();
1553                    builder.append_value("");
1554                }
1555                None => builder.append_null(),
1556            }
1557        }
1558        builder.finish()
1559    }
1560}
1561
1562impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1563    type Error = ArrayError;
1564
1565    fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1566        array
1567            .iter()
1568            .map(|o| {
1569                o.map(|s| {
1570                    s.parse()
1571                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1572                })
1573                .transpose()
1574            })
1575            .try_collect()
1576    }
1577}
1578
1579impl From<arrow_buffer::i256> for Int256 {
1580    fn from(value: arrow_buffer::i256) -> Self {
1581        let buffer = value.to_be_bytes();
1582        Int256::from_be_bytes(buffer)
1583    }
1584}
1585
1586impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1587    fn from(val: Int256Ref<'a>) -> Self {
1588        let buffer = val.to_be_bytes();
1589        arrow_buffer::i256::from_be_bytes(buffer)
1590    }
1591}
1592
1593impl From<&Int256Array> for arrow_array::Decimal256Array {
1594    fn from(array: &Int256Array) -> Self {
1595        array
1596            .iter()
1597            .map(|o| o.map(arrow_buffer::i256::from))
1598            .collect()
1599    }
1600}
1601
1602impl From<&arrow_array::Decimal256Array> for Int256Array {
1603    fn from(array: &arrow_array::Decimal256Array) -> Self {
1604        let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1605
1606        values
1607            .iter()
1608            .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1609            .collect()
1610    }
1611}
1612
1613/// This function checks whether the schema of a Parquet file matches the user-defined schema in RisingWave.
1614/// It handles the following special cases:
1615/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `Timestamp` type.
1616/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `Timestamptz` type.
1617/// - Since RisingWave does not have an `UInt` type:
1618///   - Arrow's `UInt8` matches with RisingWave's `Int16`.
1619///   - Arrow's `UInt16` matches with RisingWave's `Int32`.
1620///   - Arrow's `UInt32` matches with RisingWave's `Int64`.
1621///   - Arrow's `UInt64` matches with RisingWave's `Decimal`.
1622/// - Arrow's `Float16` matches with RisingWave's `Float32`.
1623///
1624/// Nested data type matching:
1625/// - Struct: Arrow's `Struct` type matches with RisingWave's `Struct` type recursively, requiring that all expected fields exist and match by name and type. Extra Arrow fields are allowed.
1626/// - List: Arrow's `List` type matches with RisingWave's `List` type recursively, requiring the same element type.
1627/// - Map: Arrow's `Map` type matches with RisingWave's `Map` type recursively, requiring the key and value types to match, and the inner struct must have exactly two fields named "key" and "value".
1628pub fn is_parquet_schema_match_source_schema(
1629    arrow_data_type: &arrow_schema::DataType,
1630    rw_data_type: &crate::types::DataType,
1631) -> bool {
1632    use arrow_schema::DataType as ArrowType;
1633
1634    use crate::types::{DataType as RwType, MapType, StructType};
1635
1636    match (arrow_data_type, rw_data_type) {
1637        // Primitive type matching and special cases
1638        (ArrowType::Boolean, RwType::Boolean)
1639        | (ArrowType::Int8 | ArrowType::Int16 | ArrowType::UInt8, RwType::Int16)
1640        | (ArrowType::Int32 | ArrowType::UInt16, RwType::Int32)
1641        | (ArrowType::Int64 | ArrowType::UInt32, RwType::Int64)
1642        | (ArrowType::UInt64 | ArrowType::Decimal128(_, _), RwType::Decimal)
1643        | (ArrowType::Decimal256(_, _), RwType::Int256)
1644        | (ArrowType::Float16 | ArrowType::Float32, RwType::Float32)
1645        | (ArrowType::Float64, RwType::Float64)
1646        | (ArrowType::Timestamp(_, None), RwType::Timestamp)
1647        | (ArrowType::Timestamp(_, Some(_)), RwType::Timestamptz)
1648        | (ArrowType::Date32, RwType::Date)
1649        | (ArrowType::Time32(_) | ArrowType::Time64(_), RwType::Time)
1650        | (ArrowType::Interval(arrow_schema::IntervalUnit::MonthDayNano), RwType::Interval)
1651        | (ArrowType::Utf8 | ArrowType::LargeUtf8, RwType::Varchar)
1652        | (
1653            ArrowType::Binary | ArrowType::LargeBinary | ArrowType::FixedSizeBinary(_),
1654            RwType::Bytea,
1655        ) => true,
1656
1657        // Struct type recursive matching
1658        // Arrow's Struct matches RisingWave's Struct if all expected field names exist and types
1659        // match recursively. Extra Arrow fields are allowed and field order is ignored.
1660        (ArrowType::Struct(arrow_fields), RwType::Struct(rw_struct)) => {
1661            if arrow_fields.len() < rw_struct.len() {
1662                return false;
1663            }
1664            for (rw_name, rw_ty) in rw_struct.iter() {
1665                let Some(arrow_field) = arrow_fields.iter().find(|f| f.name() == rw_name) else {
1666                    return false;
1667                };
1668                if !is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_ty) {
1669                    return false;
1670                }
1671            }
1672            true
1673        }
1674        // List type recursive matching
1675        // Arrow's List matches RisingWave's List if the element type matches recursively
1676        (ArrowType::List(arrow_field), RwType::List(rw_list_ty)) => {
1677            is_parquet_schema_match_source_schema(arrow_field.data_type(), rw_list_ty.elem())
1678        }
1679        // Map type recursive matching
1680        // Arrow's Map matches RisingWave's Map if the key and value types match recursively,
1681        // and the inner struct has exactly two fields named "key" and "value"
1682        (ArrowType::Map(arrow_field, _), RwType::Map(rw_map_ty)) => {
1683            if let ArrowType::Struct(fields) = arrow_field.data_type() {
1684                if fields.len() != 2 {
1685                    return false;
1686                }
1687                let key_field = &fields[0];
1688                let value_field = &fields[1];
1689                if key_field.name() != "key" || value_field.name() != "value" {
1690                    return false;
1691                }
1692                let (rw_key_ty, rw_value_ty) = (rw_map_ty.key(), rw_map_ty.value());
1693                is_parquet_schema_match_source_schema(key_field.data_type(), rw_key_ty)
1694                    && is_parquet_schema_match_source_schema(value_field.data_type(), rw_value_ty)
1695            } else {
1696                false
1697            }
1698        }
1699        // Fallback: types do not match
1700        _ => false,
1701    }
1702}
1703#[cfg(test)]
1704mod tests {
1705
1706    use arrow_schema::{DataType as ArrowType, Field as ArrowField};
1707
1708    use super::*;
1709    use crate::types::{DataType as RwType, MapType, StructType};
1710
1711    #[test]
1712    fn test_struct_schema_match() {
1713        // Arrow: struct<f1: Double, f2: Utf8>
1714
1715        let arrow_struct = ArrowType::Struct(
1716            vec![
1717                ArrowField::new("f1", ArrowType::Float64, true),
1718                ArrowField::new("f2", ArrowType::Utf8, true),
1719            ]
1720            .into(),
1721        );
1722        // RW: struct<f1 Double, f2 Varchar>
1723        let rw_struct = RwType::Struct(StructType::new(vec![
1724            ("f1".to_owned(), RwType::Float64),
1725            ("f2".to_owned(), RwType::Varchar),
1726        ]));
1727        assert!(is_parquet_schema_match_source_schema(
1728            &arrow_struct,
1729            &rw_struct
1730        ));
1731
1732        // Arrow is a superset of RW struct fields.
1733        let arrow_struct_superset = ArrowType::Struct(
1734            vec![
1735                ArrowField::new("f1", ArrowType::Float64, true),
1736                ArrowField::new("f2", ArrowType::Utf8, true),
1737                ArrowField::new("f3", ArrowType::Int32, true),
1738            ]
1739            .into(),
1740        );
1741        assert!(is_parquet_schema_match_source_schema(
1742            &arrow_struct_superset,
1743            &rw_struct
1744        ));
1745
1746        // Field order is ignored for struct matching.
1747        let arrow_struct_reordered = ArrowType::Struct(
1748            vec![
1749                ArrowField::new("f2", ArrowType::Utf8, true),
1750                ArrowField::new("f1", ArrowType::Float64, true),
1751            ]
1752            .into(),
1753        );
1754        assert!(is_parquet_schema_match_source_schema(
1755            &arrow_struct_reordered,
1756            &rw_struct
1757        ));
1758
1759        // Field names do not match
1760        let arrow_struct2 = ArrowType::Struct(
1761            vec![
1762                ArrowField::new("f1", ArrowType::Float64, true),
1763                ArrowField::new("f3", ArrowType::Utf8, true),
1764            ]
1765            .into(),
1766        );
1767        assert!(!is_parquet_schema_match_source_schema(
1768            &arrow_struct2,
1769            &rw_struct
1770        ));
1771    }
1772
1773    #[test]
1774    fn test_struct_projection_from_arrow() {
1775        use std::sync::Arc;
1776
1777        use itertools::Itertools;
1778
1779        struct Dummy;
1780        impl FromArrow for Dummy {}
1781
1782        // Actual Arrow struct: struct<foo:int32, bar:utf8, baz:int32>
1783        let actual_fields: arrow_schema::Fields = vec![
1784            ArrowField::new("foo", ArrowType::Int32, true),
1785            ArrowField::new("bar", ArrowType::Utf8, true),
1786            ArrowField::new("baz", ArrowType::Int32, true),
1787        ]
1788        .into();
1789        let foo: arrow_array::ArrayRef =
1790            Arc::new(arrow_array::Int32Array::from(vec![Some(10), Some(20)]));
1791        let bar: arrow_array::ArrayRef =
1792            Arc::new(arrow_array::StringArray::from(vec![Some("a"), Some("b")]));
1793        let baz: arrow_array::ArrayRef =
1794            Arc::new(arrow_array::Int32Array::from(vec![Some(100), Some(200)]));
1795        let actual_struct = arrow_array::StructArray::new(actual_fields, vec![foo, bar, baz], None);
1796        let actual_struct_ref: arrow_array::ArrayRef = Arc::new(actual_struct);
1797
1798        // Expected struct in RW schema (via to_arrow_field): struct<foo:int32, bar:utf8>
1799        let expected_field = ArrowField::new(
1800            "s",
1801            ArrowType::Struct(
1802                vec![
1803                    ArrowField::new("foo", ArrowType::Int32, true),
1804                    ArrowField::new("bar", ArrowType::Utf8, true),
1805                ]
1806                .into(),
1807            ),
1808            true,
1809        );
1810
1811        let array_impl = Dummy
1812            .from_array(&expected_field, &actual_struct_ref)
1813            .unwrap();
1814
1815        let ArrayImpl::Struct(s) = array_impl else {
1816            panic!("expected RW StructArray");
1817        };
1818
1819        let DataType::Struct(st) = s.data_type() else {
1820            panic!("expected RW struct type");
1821        };
1822        assert_eq!(st.len(), 2);
1823        assert_eq!(st.iter().map(|(n, _)| n).collect_vec(), vec!["foo", "bar"]);
1824
1825        let v0 = s.value_at(0).unwrap().to_owned_scalar();
1826        let v1 = s.value_at(1).unwrap().to_owned_scalar();
1827        assert_eq!(
1828            v0,
1829            StructValue::new(vec![
1830                Some(ScalarImpl::Int32(10)),
1831                Some(ScalarImpl::Utf8("a".into()))
1832            ])
1833        );
1834        assert_eq!(
1835            v1,
1836            StructValue::new(vec![
1837                Some(ScalarImpl::Int32(20)),
1838                Some(ScalarImpl::Utf8("b".into()))
1839            ])
1840        );
1841    }
1842
1843    #[test]
1844    fn test_list_schema_match() {
1845        // Arrow: list<double>
1846        let arrow_list =
1847            ArrowType::List(Box::new(ArrowField::new("item", ArrowType::Float64, true)).into());
1848        // RW: list<double>
1849        let rw_list = RwType::Float64.list();
1850        assert!(is_parquet_schema_match_source_schema(&arrow_list, &rw_list));
1851
1852        let rw_list2 = RwType::Int32.list();
1853        assert!(!is_parquet_schema_match_source_schema(
1854            &arrow_list,
1855            &rw_list2
1856        ));
1857    }
1858
1859    #[test]
1860    fn test_map_schema_match() {
1861        // Arrow: map<utf8, int32>
1862        let arrow_map = ArrowType::Map(
1863            Arc::new(ArrowField::new(
1864                "entries",
1865                ArrowType::Struct(
1866                    vec![
1867                        ArrowField::new("key", ArrowType::Utf8, false),
1868                        ArrowField::new("value", ArrowType::Int32, true),
1869                    ]
1870                    .into(),
1871                ),
1872                false,
1873            )),
1874            false,
1875        );
1876        // RW: map<varchar, int32>
1877        let rw_map = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Int32));
1878        assert!(is_parquet_schema_match_source_schema(&arrow_map, &rw_map));
1879
1880        // Key type does not match
1881        let rw_map2 = RwType::Map(MapType::from_kv(RwType::Int32, RwType::Int32));
1882        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map2));
1883
1884        // Value type does not match
1885        let rw_map3 = RwType::Map(MapType::from_kv(RwType::Varchar, RwType::Float64));
1886        assert!(!is_parquet_schema_match_source_schema(&arrow_map, &rw_map3));
1887
1888        // Arrow inner struct field name does not match
1889        let arrow_map2 = ArrowType::Map(
1890            Arc::new(ArrowField::new(
1891                "entries",
1892                ArrowType::Struct(
1893                    vec![
1894                        ArrowField::new("k", ArrowType::Utf8, false),
1895                        ArrowField::new("value", ArrowType::Int32, true),
1896                    ]
1897                    .into(),
1898                ),
1899                false,
1900            )),
1901            false,
1902        );
1903        assert!(!is_parquet_schema_match_source_schema(&arrow_map2, &rw_map));
1904    }
1905
1906    #[test]
1907    fn bool() {
1908        let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1909        let arrow = arrow_array::BooleanArray::from(&array);
1910        assert_eq!(BoolArray::from(&arrow), array);
1911    }
1912
1913    #[test]
1914    fn i16() {
1915        let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1916        let arrow = arrow_array::Int16Array::from(&array);
1917        assert_eq!(I16Array::from(&arrow), array);
1918    }
1919
1920    #[test]
1921    fn i32() {
1922        let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1923        let arrow = arrow_array::Int32Array::from(&array);
1924        assert_eq!(I32Array::from(&arrow), array);
1925    }
1926
1927    #[test]
1928    fn i64() {
1929        let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1930        let arrow = arrow_array::Int64Array::from(&array);
1931        assert_eq!(I64Array::from(&arrow), array);
1932    }
1933
1934    #[test]
1935    fn f32() {
1936        let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1937        let arrow = arrow_array::Float32Array::from(&array);
1938        assert_eq!(F32Array::from(&arrow), array);
1939    }
1940
1941    #[test]
1942    fn f64() {
1943        let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1944        let arrow = arrow_array::Float64Array::from(&array);
1945        assert_eq!(F64Array::from(&arrow), array);
1946    }
1947
1948    #[test]
1949    fn int8() {
1950        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1951        let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1952        let converted: PrimitiveArray<i16> = (&arr).into();
1953        assert_eq!(converted, array);
1954    }
1955
1956    #[test]
1957    fn uint8() {
1958        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1959        let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1960        let converted: PrimitiveArray<i16> = (&arr).into();
1961        assert_eq!(converted, array);
1962    }
1963
1964    #[test]
1965    fn uint16() {
1966        let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1967        let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1968        let converted: PrimitiveArray<i32> = (&arr).into();
1969        assert_eq!(converted, array);
1970    }
1971
1972    #[test]
1973    fn uint32() {
1974        let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1975        let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1976        let converted: PrimitiveArray<i64> = (&arr).into();
1977        assert_eq!(converted, array);
1978    }
1979
1980    #[test]
1981    fn uint64() {
1982        let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1983            None,
1984            Some(Decimal::Normalized("7".parse().unwrap())),
1985            Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1986        ]);
1987        let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1988        let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1989        assert_eq!(converted, array);
1990    }
1991
1992    #[test]
1993    fn date() {
1994        let array = DateArray::from_iter([
1995            None,
1996            Date::with_days_since_ce(12345).ok(),
1997            Date::with_days_since_ce(-12345).ok(),
1998        ]);
1999        let arrow = arrow_array::Date32Array::from(&array);
2000        assert_eq!(DateArray::from(&arrow), array);
2001    }
2002
2003    #[test]
2004    fn time() {
2005        let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
2006        let arrow = arrow_array::Time64MicrosecondArray::from(&array);
2007        assert_eq!(TimeArray::from(&arrow), array);
2008    }
2009
2010    #[test]
2011    fn timestamp() {
2012        let array =
2013            TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
2014        let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
2015        assert_eq!(TimestampArray::from(&arrow), array);
2016    }
2017
2018    #[test]
2019    fn interval() {
2020        let array = IntervalArray::from_iter([
2021            None,
2022            Some(Interval::from_month_day_usec(
2023                1_000_000,
2024                1_000,
2025                1_000_000_000,
2026            )),
2027            Some(Interval::from_month_day_usec(
2028                -1_000_000,
2029                -1_000,
2030                -1_000_000_000,
2031            )),
2032        ]);
2033        let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
2034        assert_eq!(IntervalArray::from(&arrow), array);
2035    }
2036
2037    #[test]
2038    fn string() {
2039        let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
2040        let arrow = arrow_array::StringArray::from(&array);
2041        assert_eq!(Utf8Array::from(&arrow), array);
2042    }
2043
2044    #[test]
2045    fn binary() {
2046        let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
2047        let arrow = arrow_array::BinaryArray::from(&array);
2048        assert_eq!(BytesArray::from(&arrow), array);
2049    }
2050
2051    #[test]
2052    fn fixed_size_binary() {
2053        struct Dummy;
2054        impl FromArrow for Dummy {}
2055
2056        let uuid = [
2057            0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
2058            0xde, 0xf0,
2059        ];
2060        let arrow_array = arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
2061            [None, Some(uuid)].into_iter(),
2062            16,
2063        )
2064        .unwrap();
2065        let field =
2066            arrow_schema::Field::new("u", arrow_schema::DataType::FixedSizeBinary(16), true);
2067
2068        assert_eq!(Dummy.from_field(&field).unwrap(), DataType::Bytea);
2069
2070        let rw_array = Dummy
2071            .from_array(&field, &(Arc::new(arrow_array) as arrow_array::ArrayRef))
2072            .unwrap();
2073        let expected = BytesArray::from_iter([None, Some(uuid.as_slice())]);
2074        assert_eq!(rw_array.as_bytea(), &expected);
2075    }
2076
2077    #[test]
2078    fn decimal() {
2079        let array = DecimalArray::from_iter([
2080            None,
2081            Some(Decimal::NaN),
2082            Some(Decimal::PositiveInf),
2083            Some(Decimal::NegativeInf),
2084            Some(Decimal::Normalized("123.4".parse().unwrap())),
2085            Some(Decimal::Normalized("123.456".parse().unwrap())),
2086        ]);
2087        let arrow = arrow_array::LargeBinaryArray::from(&array);
2088        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
2089
2090        let arrow = arrow_array::StringArray::from(&array);
2091        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
2092    }
2093
2094    #[test]
2095    fn jsonb() {
2096        let array = JsonbArray::from_iter([
2097            None,
2098            Some("null".parse().unwrap()),
2099            Some("false".parse().unwrap()),
2100            Some("1".parse().unwrap()),
2101            Some("[1, 2, 3]".parse().unwrap()),
2102            Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
2103        ]);
2104        let arrow = arrow_array::LargeStringArray::from(&array);
2105        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
2106
2107        let arrow = arrow_array::StringArray::from(&array);
2108        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
2109    }
2110
2111    #[test]
2112    fn int256() {
2113        let values = [
2114            None,
2115            Some(Int256::from(1)),
2116            Some(Int256::from(i64::MAX)),
2117            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
2118            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
2119            Some(
2120                Int256::from(i64::MAX)
2121                    * Int256::from(i64::MAX)
2122                    * Int256::from(i64::MAX)
2123                    * Int256::from(i64::MAX),
2124            ),
2125            Some(Int256::min_value()),
2126            Some(Int256::max_value()),
2127        ];
2128
2129        let array =
2130            Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
2131        let arrow = arrow_array::Decimal256Array::from(&array);
2132        assert_eq!(Int256Array::from(&arrow), array);
2133    }
2134}