risingwave_common/array/arrow/
arrow_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Converts between arrays and Apache Arrow arrays.
16//!
17//! This file acts as a template file for conversion code between
18//! arrays and different version of Apache Arrow.
19//!
20//! The conversion logic will be implemented for the arrow version specified in the outer mod by
21//! `super::arrow_xxx`, such as `super::arrow_array`.
22//!
23//! When we want to implement the conversion logic for an arrow version, we first
24//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx`
25//! using the `use` clause, and then declare a sub-mod and set its file path with attribute
26//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to
27//! the new mod file, and the conversion logic can be implemented for the corresponding arrow
28//! version.
29//!
30//! Example can be seen in `arrow_default.rs`, which is also as followed:
31//! ```ignore
32//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
33//!
34//! #[allow(clippy::duplicate_mod)]
35//! #[path = "./arrow_impl.rs"]
36//! mod arrow_impl;
37//! ```
38
39// Is this a bug? Why do we have these lints?
40#![allow(unused_imports)]
41#![allow(dead_code)]
42
43use std::fmt::Write;
44
45use arrow_array::array;
46use arrow_array::cast::AsArray;
47use arrow_buffer::OffsetBuffer;
48use arrow_schema::TimeUnit;
49use chrono::{DateTime, NaiveDateTime, NaiveTime};
50use itertools::Itertools;
51
52use super::arrow_schema::IntervalUnit;
53// This is important because we want to use the arrow version specified by the outer mod.
54use super::{ArrowIntervalType, arrow_array, arrow_buffer, arrow_cast, arrow_schema};
55// Other import should always use the absolute path.
56use crate::array::*;
57use crate::types::{DataType as RwDataType, *};
58use crate::util::iter_util::ZipEqFast;
59
60/// Defines how to convert RisingWave arrays to Arrow arrays.
61///
62/// This trait allows for customized conversion logic for different external systems using Arrow.
63/// The default implementation is based on the `From` implemented in this mod.
64pub trait ToArrow {
65    /// Converts RisingWave `DataChunk` to Arrow `RecordBatch` with specified schema.
66    ///
67    /// This function will try to convert the array if the type is not same with the schema.
68    fn to_record_batch(
69        &self,
70        schema: arrow_schema::SchemaRef,
71        chunk: &DataChunk,
72    ) -> Result<arrow_array::RecordBatch, ArrayError> {
73        // compact the chunk if it's not compacted
74        if !chunk.is_compacted() {
75            let c = chunk.clone();
76            return self.to_record_batch(schema, &c.compact());
77        }
78
79        // convert each column to arrow array
80        let columns: Vec<_> = chunk
81            .columns()
82            .iter()
83            .zip_eq_fast(schema.fields().iter())
84            .map(|(column, field)| self.to_array(field.data_type(), column))
85            .try_collect()?;
86
87        // create record batch
88        let opts =
89            arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
90        arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
91            .map_err(ArrayError::to_arrow)
92    }
93
94    /// Converts RisingWave array to Arrow array.
95    fn to_array(
96        &self,
97        data_type: &arrow_schema::DataType,
98        array: &ArrayImpl,
99    ) -> Result<arrow_array::ArrayRef, ArrayError> {
100        let arrow_array = match array {
101            ArrayImpl::Bool(array) => self.bool_to_arrow(array),
102            ArrayImpl::Int16(array) => self.int16_to_arrow(array),
103            ArrayImpl::Int32(array) => self.int32_to_arrow(array),
104            ArrayImpl::Int64(array) => self.int64_to_arrow(array),
105            ArrayImpl::Int256(array) => self.int256_to_arrow(array),
106            ArrayImpl::Float32(array) => self.float32_to_arrow(array),
107            ArrayImpl::Float64(array) => self.float64_to_arrow(array),
108            ArrayImpl::Date(array) => self.date_to_arrow(array),
109            ArrayImpl::Time(array) => self.time_to_arrow(array),
110            ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
111            ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
112            ArrayImpl::Interval(array) => self.interval_to_arrow(array),
113            ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
114            ArrayImpl::Bytea(array) => self.bytea_to_arrow(array),
115            ArrayImpl::Decimal(array) => self.decimal_to_arrow(data_type, array),
116            ArrayImpl::Jsonb(array) => self.jsonb_to_arrow(array),
117            ArrayImpl::Serial(array) => self.serial_to_arrow(array),
118            ArrayImpl::List(array) => self.list_to_arrow(data_type, array),
119            ArrayImpl::Struct(array) => self.struct_to_arrow(data_type, array),
120            ArrayImpl::Map(array) => self.map_to_arrow(data_type, array),
121        }?;
122        if arrow_array.data_type() != data_type {
123            arrow_cast::cast(&arrow_array, data_type).map_err(ArrayError::to_arrow)
124        } else {
125            Ok(arrow_array)
126        }
127    }
128
129    #[inline]
130    fn bool_to_arrow(&self, array: &BoolArray) -> Result<arrow_array::ArrayRef, ArrayError> {
131        Ok(Arc::new(arrow_array::BooleanArray::from(array)))
132    }
133
134    #[inline]
135    fn int16_to_arrow(&self, array: &I16Array) -> Result<arrow_array::ArrayRef, ArrayError> {
136        Ok(Arc::new(arrow_array::Int16Array::from(array)))
137    }
138
139    #[inline]
140    fn int32_to_arrow(&self, array: &I32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
141        Ok(Arc::new(arrow_array::Int32Array::from(array)))
142    }
143
144    #[inline]
145    fn int64_to_arrow(&self, array: &I64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
146        Ok(Arc::new(arrow_array::Int64Array::from(array)))
147    }
148
149    #[inline]
150    fn float32_to_arrow(&self, array: &F32Array) -> Result<arrow_array::ArrayRef, ArrayError> {
151        Ok(Arc::new(arrow_array::Float32Array::from(array)))
152    }
153
154    #[inline]
155    fn float64_to_arrow(&self, array: &F64Array) -> Result<arrow_array::ArrayRef, ArrayError> {
156        Ok(Arc::new(arrow_array::Float64Array::from(array)))
157    }
158
159    #[inline]
160    fn utf8_to_arrow(&self, array: &Utf8Array) -> Result<arrow_array::ArrayRef, ArrayError> {
161        Ok(Arc::new(arrow_array::StringArray::from(array)))
162    }
163
164    #[inline]
165    fn int256_to_arrow(&self, array: &Int256Array) -> Result<arrow_array::ArrayRef, ArrayError> {
166        Ok(Arc::new(arrow_array::Decimal256Array::from(array)))
167    }
168
169    #[inline]
170    fn date_to_arrow(&self, array: &DateArray) -> Result<arrow_array::ArrayRef, ArrayError> {
171        Ok(Arc::new(arrow_array::Date32Array::from(array)))
172    }
173
174    #[inline]
175    fn timestamp_to_arrow(
176        &self,
177        array: &TimestampArray,
178    ) -> Result<arrow_array::ArrayRef, ArrayError> {
179        Ok(Arc::new(arrow_array::TimestampMicrosecondArray::from(
180            array,
181        )))
182    }
183
184    #[inline]
185    fn timestamptz_to_arrow(
186        &self,
187        array: &TimestamptzArray,
188    ) -> Result<arrow_array::ArrayRef, ArrayError> {
189        Ok(Arc::new(
190            arrow_array::TimestampMicrosecondArray::from(array).with_timezone_utc(),
191        ))
192    }
193
194    #[inline]
195    fn time_to_arrow(&self, array: &TimeArray) -> Result<arrow_array::ArrayRef, ArrayError> {
196        Ok(Arc::new(arrow_array::Time64MicrosecondArray::from(array)))
197    }
198
199    #[inline]
200    fn interval_to_arrow(
201        &self,
202        array: &IntervalArray,
203    ) -> Result<arrow_array::ArrayRef, ArrayError> {
204        Ok(Arc::new(arrow_array::IntervalMonthDayNanoArray::from(
205            array,
206        )))
207    }
208
209    #[inline]
210    fn bytea_to_arrow(&self, array: &BytesArray) -> Result<arrow_array::ArrayRef, ArrayError> {
211        Ok(Arc::new(arrow_array::BinaryArray::from(array)))
212    }
213
214    // Decimal values are stored as ASCII text representation in a string array.
215    #[inline]
216    fn decimal_to_arrow(
217        &self,
218        _data_type: &arrow_schema::DataType,
219        array: &DecimalArray,
220    ) -> Result<arrow_array::ArrayRef, ArrayError> {
221        Ok(Arc::new(arrow_array::StringArray::from(array)))
222    }
223
224    // JSON values are stored as text representation in a string array.
225    #[inline]
226    fn jsonb_to_arrow(&self, array: &JsonbArray) -> Result<arrow_array::ArrayRef, ArrayError> {
227        Ok(Arc::new(arrow_array::StringArray::from(array)))
228    }
229
230    #[inline]
231    fn serial_to_arrow(&self, array: &SerialArray) -> Result<arrow_array::ArrayRef, ArrayError> {
232        Ok(Arc::new(arrow_array::Int64Array::from(array)))
233    }
234
235    #[inline]
236    fn list_to_arrow(
237        &self,
238        data_type: &arrow_schema::DataType,
239        array: &ListArray,
240    ) -> Result<arrow_array::ArrayRef, ArrayError> {
241        let arrow_schema::DataType::List(field) = data_type else {
242            return Err(ArrayError::to_arrow("Invalid list type"));
243        };
244        let values = self.to_array(field.data_type(), array.values())?;
245        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
246        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
247        Ok(Arc::new(arrow_array::ListArray::new(
248            field.clone(),
249            offsets,
250            values,
251            nulls,
252        )))
253    }
254
255    #[inline]
256    fn struct_to_arrow(
257        &self,
258        data_type: &arrow_schema::DataType,
259        array: &StructArray,
260    ) -> Result<arrow_array::ArrayRef, ArrayError> {
261        let arrow_schema::DataType::Struct(fields) = data_type else {
262            return Err(ArrayError::to_arrow("Invalid struct type"));
263        };
264        Ok(Arc::new(arrow_array::StructArray::new(
265            fields.clone(),
266            array
267                .fields()
268                .zip_eq_fast(fields)
269                .map(|(arr, field)| self.to_array(field.data_type(), arr))
270                .try_collect::<_, _, ArrayError>()?,
271            Some(array.null_bitmap().into()),
272        )))
273    }
274
275    #[inline]
276    fn map_to_arrow(
277        &self,
278        data_type: &arrow_schema::DataType,
279        array: &MapArray,
280    ) -> Result<arrow_array::ArrayRef, ArrayError> {
281        let arrow_schema::DataType::Map(field, ordered) = data_type else {
282            return Err(ArrayError::to_arrow("Invalid map type"));
283        };
284        if *ordered {
285            return Err(ArrayError::to_arrow("Sorted map is not supported"));
286        }
287        let values = self
288            .struct_to_arrow(field.data_type(), array.as_struct())?
289            .as_struct()
290            .clone();
291        let offsets = OffsetBuffer::new(array.offsets().iter().map(|&o| o as i32).collect());
292        let nulls = (!array.null_bitmap().all()).then(|| array.null_bitmap().into());
293        Ok(Arc::new(arrow_array::MapArray::new(
294            field.clone(),
295            offsets,
296            values,
297            nulls,
298            *ordered,
299        )))
300    }
301
302    /// Convert RisingWave data type to Arrow data type.
303    ///
304    /// This function returns a `Field` instead of `DataType` because some may be converted to
305    /// extension types which require additional metadata in the field.
306    fn to_arrow_field(
307        &self,
308        name: &str,
309        value: &DataType,
310    ) -> Result<arrow_schema::Field, ArrayError> {
311        let data_type = match value {
312            // using the inline function
313            DataType::Boolean => self.bool_type_to_arrow(),
314            DataType::Int16 => self.int16_type_to_arrow(),
315            DataType::Int32 => self.int32_type_to_arrow(),
316            DataType::Int64 => self.int64_type_to_arrow(),
317            DataType::Int256 => self.int256_type_to_arrow(),
318            DataType::Float32 => self.float32_type_to_arrow(),
319            DataType::Float64 => self.float64_type_to_arrow(),
320            DataType::Date => self.date_type_to_arrow(),
321            DataType::Time => self.time_type_to_arrow(),
322            DataType::Timestamp => self.timestamp_type_to_arrow(),
323            DataType::Timestamptz => self.timestamptz_type_to_arrow(),
324            DataType::Interval => self.interval_type_to_arrow(),
325            DataType::Varchar => self.varchar_type_to_arrow(),
326            DataType::Bytea => self.bytea_type_to_arrow(),
327            DataType::Serial => self.serial_type_to_arrow(),
328            DataType::Decimal => return Ok(self.decimal_type_to_arrow(name)),
329            DataType::Jsonb => return Ok(self.jsonb_type_to_arrow(name)),
330            DataType::Struct(fields) => self.struct_type_to_arrow(fields)?,
331            DataType::List(datatype) => self.list_type_to_arrow(datatype)?,
332            DataType::Map(datatype) => self.map_type_to_arrow(datatype)?,
333        };
334        Ok(arrow_schema::Field::new(name, data_type, true))
335    }
336
337    #[inline]
338    fn bool_type_to_arrow(&self) -> arrow_schema::DataType {
339        arrow_schema::DataType::Boolean
340    }
341
342    #[inline]
343    fn int16_type_to_arrow(&self) -> arrow_schema::DataType {
344        arrow_schema::DataType::Int16
345    }
346
347    #[inline]
348    fn int32_type_to_arrow(&self) -> arrow_schema::DataType {
349        arrow_schema::DataType::Int32
350    }
351
352    #[inline]
353    fn int64_type_to_arrow(&self) -> arrow_schema::DataType {
354        arrow_schema::DataType::Int64
355    }
356
357    #[inline]
358    fn int256_type_to_arrow(&self) -> arrow_schema::DataType {
359        arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)
360    }
361
362    #[inline]
363    fn float32_type_to_arrow(&self) -> arrow_schema::DataType {
364        arrow_schema::DataType::Float32
365    }
366
367    #[inline]
368    fn float64_type_to_arrow(&self) -> arrow_schema::DataType {
369        arrow_schema::DataType::Float64
370    }
371
372    #[inline]
373    fn date_type_to_arrow(&self) -> arrow_schema::DataType {
374        arrow_schema::DataType::Date32
375    }
376
377    #[inline]
378    fn time_type_to_arrow(&self) -> arrow_schema::DataType {
379        arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond)
380    }
381
382    #[inline]
383    fn timestamp_type_to_arrow(&self) -> arrow_schema::DataType {
384        arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
385    }
386
387    #[inline]
388    fn timestamptz_type_to_arrow(&self) -> arrow_schema::DataType {
389        arrow_schema::DataType::Timestamp(
390            arrow_schema::TimeUnit::Microsecond,
391            Some("+00:00".into()),
392        )
393    }
394
395    #[inline]
396    fn interval_type_to_arrow(&self) -> arrow_schema::DataType {
397        arrow_schema::DataType::Interval(arrow_schema::IntervalUnit::MonthDayNano)
398    }
399
400    #[inline]
401    fn varchar_type_to_arrow(&self) -> arrow_schema::DataType {
402        arrow_schema::DataType::Utf8
403    }
404
405    #[inline]
406    fn jsonb_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
407        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
408            .with_metadata([("ARROW:extension:name".into(), "arrowudf.json".into())].into())
409    }
410
411    #[inline]
412    fn bytea_type_to_arrow(&self) -> arrow_schema::DataType {
413        arrow_schema::DataType::Binary
414    }
415
416    #[inline]
417    fn decimal_type_to_arrow(&self, name: &str) -> arrow_schema::Field {
418        arrow_schema::Field::new(name, arrow_schema::DataType::Utf8, true)
419            .with_metadata([("ARROW:extension:name".into(), "arrowudf.decimal".into())].into())
420    }
421
422    #[inline]
423    fn serial_type_to_arrow(&self) -> arrow_schema::DataType {
424        arrow_schema::DataType::Int64
425    }
426
427    #[inline]
428    fn list_type_to_arrow(
429        &self,
430        elem_type: &DataType,
431    ) -> Result<arrow_schema::DataType, ArrayError> {
432        Ok(arrow_schema::DataType::List(Arc::new(
433            self.to_arrow_field("item", elem_type)?,
434        )))
435    }
436
437    #[inline]
438    fn struct_type_to_arrow(
439        &self,
440        fields: &StructType,
441    ) -> Result<arrow_schema::DataType, ArrayError> {
442        Ok(arrow_schema::DataType::Struct(
443            fields
444                .iter()
445                .map(|(name, ty)| self.to_arrow_field(name, ty))
446                .try_collect::<_, _, ArrayError>()?,
447        ))
448    }
449
450    #[inline]
451    fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
452        let sorted = false;
453        // "key" is always non-null
454        let key = self
455            .to_arrow_field("key", map_type.key())?
456            .with_nullable(false);
457        let value = self.to_arrow_field("value", map_type.value())?;
458        Ok(arrow_schema::DataType::Map(
459            Arc::new(arrow_schema::Field::new(
460                "entries",
461                arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
462                // "entries" is always non-null
463                false,
464            )),
465            sorted,
466        ))
467    }
468}
469
470/// Defines how to convert Arrow arrays to RisingWave arrays.
471#[allow(clippy::wrong_self_convention)]
472pub trait FromArrow {
473    /// Converts Arrow `RecordBatch` to RisingWave `DataChunk`.
474    fn from_record_batch(&self, batch: &arrow_array::RecordBatch) -> Result<DataChunk, ArrayError> {
475        let mut columns = Vec::with_capacity(batch.num_columns());
476        for (array, field) in batch.columns().iter().zip_eq_fast(batch.schema().fields()) {
477            let column = Arc::new(self.from_array(field, array)?);
478            columns.push(column);
479        }
480        Ok(DataChunk::new(columns, batch.num_rows()))
481    }
482
483    /// Converts Arrow `Fields` to RisingWave `StructType`.
484    fn from_fields(&self, fields: &arrow_schema::Fields) -> Result<StructType, ArrayError> {
485        Ok(StructType::new(
486            fields
487                .iter()
488                .map(|f| Ok((f.name().clone(), self.from_field(f)?)))
489                .try_collect::<_, Vec<_>, ArrayError>()?,
490        ))
491    }
492
493    /// Converts Arrow `Field` to RisingWave `DataType`.
494    fn from_field(&self, field: &arrow_schema::Field) -> Result<DataType, ArrayError> {
495        use arrow_schema::DataType::*;
496        use arrow_schema::IntervalUnit::*;
497        use arrow_schema::TimeUnit::*;
498
499        // extension type
500        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
501            return self.from_extension_type(type_name, field.data_type());
502        }
503
504        Ok(match field.data_type() {
505            Boolean => DataType::Boolean,
506            Int16 => DataType::Int16,
507            Int32 => DataType::Int32,
508            Int64 => DataType::Int64,
509            Int8 => DataType::Int16,
510            UInt8 => DataType::Int16,
511            UInt16 => DataType::Int32,
512            UInt32 => DataType::Int64,
513            UInt64 => DataType::Decimal,
514            Float16 => DataType::Float32,
515            Float32 => DataType::Float32,
516            Float64 => DataType::Float64,
517            Decimal128(_, _) => DataType::Decimal,
518            Decimal256(_, _) => DataType::Int256,
519            Date32 => DataType::Date,
520            Time64(Microsecond) => DataType::Time,
521            Timestamp(Microsecond, None) => DataType::Timestamp,
522            Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
523            Timestamp(Second, None) => DataType::Timestamp,
524            Timestamp(Second, Some(_)) => DataType::Timestamptz,
525            Timestamp(Millisecond, None) => DataType::Timestamp,
526            Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
527            Timestamp(Nanosecond, None) => DataType::Timestamp,
528            Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
529            Interval(MonthDayNano) => DataType::Interval,
530            Utf8 => DataType::Varchar,
531            Binary => DataType::Bytea,
532            LargeUtf8 => self.from_large_utf8()?,
533            LargeBinary => self.from_large_binary()?,
534            List(field) => DataType::List(Box::new(self.from_field(field)?)),
535            Struct(fields) => DataType::Struct(self.from_fields(fields)?),
536            Map(field, _is_sorted) => {
537                let entries = self.from_field(field)?;
538                DataType::Map(MapType::try_from_entries(entries).map_err(|e| {
539                    ArrayError::from_arrow(format!("invalid arrow map field: {field:?}, err: {e}"))
540                })?)
541            }
542            t => {
543                return Err(ArrayError::from_arrow(format!(
544                    "unsupported arrow data type: {t:?}"
545                )));
546            }
547        })
548    }
549
550    /// Converts Arrow `LargeUtf8` type to RisingWave data type.
551    fn from_large_utf8(&self) -> Result<DataType, ArrayError> {
552        Ok(DataType::Varchar)
553    }
554
555    /// Converts Arrow `LargeBinary` type to RisingWave data type.
556    fn from_large_binary(&self) -> Result<DataType, ArrayError> {
557        Ok(DataType::Bytea)
558    }
559
560    /// Converts Arrow extension type to RisingWave `DataType`.
561    fn from_extension_type(
562        &self,
563        type_name: &str,
564        physical_type: &arrow_schema::DataType,
565    ) -> Result<DataType, ArrayError> {
566        match (type_name, physical_type) {
567            ("arrowudf.decimal", arrow_schema::DataType::Utf8) => Ok(DataType::Decimal),
568            ("arrowudf.json", arrow_schema::DataType::Utf8) => Ok(DataType::Jsonb),
569            _ => Err(ArrayError::from_arrow(format!(
570                "unsupported extension type: {type_name:?}"
571            ))),
572        }
573    }
574
575    /// Converts Arrow `Array` to RisingWave `ArrayImpl`.
576    fn from_array(
577        &self,
578        field: &arrow_schema::Field,
579        array: &arrow_array::ArrayRef,
580    ) -> Result<ArrayImpl, ArrayError> {
581        use arrow_schema::DataType::*;
582        use arrow_schema::IntervalUnit::*;
583        use arrow_schema::TimeUnit::*;
584
585        // extension type
586        if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
587            return self.from_extension_array(type_name, array);
588        }
589        match array.data_type() {
590            Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
591            Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()),
592            Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
593            Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()),
594            Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()),
595            UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()),
596            UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()),
597            UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()),
598
599            UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()),
600            Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()),
601            Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()),
602            Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()),
603            Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()),
604            Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
605            Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
606            Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
607            Timestamp(Second, None) => {
608                self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
609            }
610            Timestamp(Second, Some(_)) => {
611                self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
612            }
613            Timestamp(Millisecond, None) => {
614                self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
615            }
616            Timestamp(Millisecond, Some(_)) => {
617                self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
618            }
619            Timestamp(Microsecond, None) => {
620                self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
621            }
622            Timestamp(Microsecond, Some(_)) => {
623                self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
624            }
625            Timestamp(Nanosecond, None) => {
626                self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
627            }
628            Timestamp(Nanosecond, Some(_)) => {
629                self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
630            }
631            Interval(MonthDayNano) => {
632                self.from_interval_array(array.as_any().downcast_ref().unwrap())
633            }
634            Utf8 => self.from_utf8_array(array.as_any().downcast_ref().unwrap()),
635            Binary => self.from_binary_array(array.as_any().downcast_ref().unwrap()),
636            LargeUtf8 => self.from_large_utf8_array(array.as_any().downcast_ref().unwrap()),
637            LargeBinary => self.from_large_binary_array(array.as_any().downcast_ref().unwrap()),
638            List(_) => self.from_list_array(array.as_any().downcast_ref().unwrap()),
639            Struct(_) => self.from_struct_array(array.as_any().downcast_ref().unwrap()),
640            Map(_, _) => self.from_map_array(array.as_any().downcast_ref().unwrap()),
641            t => Err(ArrayError::from_arrow(format!(
642                "unsupported arrow data type: {t:?}",
643            ))),
644        }
645    }
646
647    /// Converts Arrow extension array to RisingWave `ArrayImpl`.
648    fn from_extension_array(
649        &self,
650        type_name: &str,
651        array: &arrow_array::ArrayRef,
652    ) -> Result<ArrayImpl, ArrayError> {
653        match type_name {
654            "arrowudf.decimal" => {
655                let array: &arrow_array::StringArray =
656                    array.as_any().downcast_ref().ok_or_else(|| {
657                        ArrayError::from_arrow(
658                            "expected string array for `arrowudf.decimal`".to_owned(),
659                        )
660                    })?;
661                Ok(ArrayImpl::Decimal(array.try_into()?))
662            }
663            "arrowudf.json" => {
664                let array: &arrow_array::StringArray =
665                    array.as_any().downcast_ref().ok_or_else(|| {
666                        ArrayError::from_arrow(
667                            "expected string array for `arrowudf.json`".to_owned(),
668                        )
669                    })?;
670                Ok(ArrayImpl::Jsonb(array.try_into()?))
671            }
672            _ => Err(ArrayError::from_arrow(format!(
673                "unsupported extension type: {type_name:?}"
674            ))),
675        }
676    }
677
678    fn from_bool_array(&self, array: &arrow_array::BooleanArray) -> Result<ArrayImpl, ArrayError> {
679        Ok(ArrayImpl::Bool(array.into()))
680    }
681
682    fn from_int16_array(&self, array: &arrow_array::Int16Array) -> Result<ArrayImpl, ArrayError> {
683        Ok(ArrayImpl::Int16(array.into()))
684    }
685
686    fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result<ArrayImpl, ArrayError> {
687        Ok(ArrayImpl::Int16(array.into()))
688    }
689
690    fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result<ArrayImpl, ArrayError> {
691        Ok(ArrayImpl::Int16(array.into()))
692    }
693
694    fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result<ArrayImpl, ArrayError> {
695        Ok(ArrayImpl::Int32(array.into()))
696    }
697
698    fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result<ArrayImpl, ArrayError> {
699        Ok(ArrayImpl::Int64(array.into()))
700    }
701
702    fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result<ArrayImpl, ArrayError> {
703        Ok(ArrayImpl::Int32(array.into()))
704    }
705
706    fn from_int64_array(&self, array: &arrow_array::Int64Array) -> Result<ArrayImpl, ArrayError> {
707        Ok(ArrayImpl::Int64(array.into()))
708    }
709
710    fn from_int256_array(
711        &self,
712        array: &arrow_array::Decimal256Array,
713    ) -> Result<ArrayImpl, ArrayError> {
714        Ok(ArrayImpl::Int256(array.into()))
715    }
716
717    fn from_decimal128_array(
718        &self,
719        array: &arrow_array::Decimal128Array,
720    ) -> Result<ArrayImpl, ArrayError> {
721        Ok(ArrayImpl::Decimal(array.try_into()?))
722    }
723
724    fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result<ArrayImpl, ArrayError> {
725        Ok(ArrayImpl::Decimal(array.try_into()?))
726    }
727
728    fn from_float16_array(
729        &self,
730        array: &arrow_array::Float16Array,
731    ) -> Result<ArrayImpl, ArrayError> {
732        Ok(ArrayImpl::Float32(array.try_into()?))
733    }
734
735    fn from_float32_array(
736        &self,
737        array: &arrow_array::Float32Array,
738    ) -> Result<ArrayImpl, ArrayError> {
739        Ok(ArrayImpl::Float32(array.into()))
740    }
741
742    fn from_float64_array(
743        &self,
744        array: &arrow_array::Float64Array,
745    ) -> Result<ArrayImpl, ArrayError> {
746        Ok(ArrayImpl::Float64(array.into()))
747    }
748
749    fn from_date32_array(&self, array: &arrow_array::Date32Array) -> Result<ArrayImpl, ArrayError> {
750        Ok(ArrayImpl::Date(array.into()))
751    }
752
753    fn from_time64us_array(
754        &self,
755        array: &arrow_array::Time64MicrosecondArray,
756    ) -> Result<ArrayImpl, ArrayError> {
757        Ok(ArrayImpl::Time(array.into()))
758    }
759
760    fn from_timestampsecond_array(
761        &self,
762        array: &arrow_array::TimestampSecondArray,
763    ) -> Result<ArrayImpl, ArrayError> {
764        Ok(ArrayImpl::Timestamp(array.into()))
765    }
766    fn from_timestampsecond_some_array(
767        &self,
768        array: &arrow_array::TimestampSecondArray,
769    ) -> Result<ArrayImpl, ArrayError> {
770        Ok(ArrayImpl::Timestamptz(array.into()))
771    }
772
773    fn from_timestampms_array(
774        &self,
775        array: &arrow_array::TimestampMillisecondArray,
776    ) -> Result<ArrayImpl, ArrayError> {
777        Ok(ArrayImpl::Timestamp(array.into()))
778    }
779
780    fn from_timestampms_some_array(
781        &self,
782        array: &arrow_array::TimestampMillisecondArray,
783    ) -> Result<ArrayImpl, ArrayError> {
784        Ok(ArrayImpl::Timestamptz(array.into()))
785    }
786
787    fn from_timestampus_array(
788        &self,
789        array: &arrow_array::TimestampMicrosecondArray,
790    ) -> Result<ArrayImpl, ArrayError> {
791        Ok(ArrayImpl::Timestamp(array.into()))
792    }
793
794    fn from_timestampus_some_array(
795        &self,
796        array: &arrow_array::TimestampMicrosecondArray,
797    ) -> Result<ArrayImpl, ArrayError> {
798        Ok(ArrayImpl::Timestamptz(array.into()))
799    }
800
801    fn from_timestampns_array(
802        &self,
803        array: &arrow_array::TimestampNanosecondArray,
804    ) -> Result<ArrayImpl, ArrayError> {
805        Ok(ArrayImpl::Timestamp(array.into()))
806    }
807
808    fn from_timestampns_some_array(
809        &self,
810        array: &arrow_array::TimestampNanosecondArray,
811    ) -> Result<ArrayImpl, ArrayError> {
812        Ok(ArrayImpl::Timestamptz(array.into()))
813    }
814
815    fn from_interval_array(
816        &self,
817        array: &arrow_array::IntervalMonthDayNanoArray,
818    ) -> Result<ArrayImpl, ArrayError> {
819        Ok(ArrayImpl::Interval(array.into()))
820    }
821
822    fn from_utf8_array(&self, array: &arrow_array::StringArray) -> Result<ArrayImpl, ArrayError> {
823        Ok(ArrayImpl::Utf8(array.into()))
824    }
825
826    fn from_binary_array(&self, array: &arrow_array::BinaryArray) -> Result<ArrayImpl, ArrayError> {
827        Ok(ArrayImpl::Bytea(array.into()))
828    }
829
830    fn from_large_utf8_array(
831        &self,
832        array: &arrow_array::LargeStringArray,
833    ) -> Result<ArrayImpl, ArrayError> {
834        Ok(ArrayImpl::Utf8(array.into()))
835    }
836
837    fn from_large_binary_array(
838        &self,
839        array: &arrow_array::LargeBinaryArray,
840    ) -> Result<ArrayImpl, ArrayError> {
841        Ok(ArrayImpl::Bytea(array.into()))
842    }
843
844    fn from_list_array(&self, array: &arrow_array::ListArray) -> Result<ArrayImpl, ArrayError> {
845        use arrow_array::Array;
846        let arrow_schema::DataType::List(field) = array.data_type() else {
847            panic!("nested field types cannot be determined.");
848        };
849        Ok(ArrayImpl::List(ListArray {
850            value: Box::new(self.from_array(field, array.values())?),
851            bitmap: match array.nulls() {
852                Some(nulls) => nulls.iter().collect(),
853                None => Bitmap::ones(array.len()),
854            },
855            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
856        }))
857    }
858
859    fn from_struct_array(&self, array: &arrow_array::StructArray) -> Result<ArrayImpl, ArrayError> {
860        use arrow_array::Array;
861        let arrow_schema::DataType::Struct(fields) = array.data_type() else {
862            panic!("nested field types cannot be determined.");
863        };
864        Ok(ArrayImpl::Struct(StructArray::new(
865            self.from_fields(fields)?,
866            array
867                .columns()
868                .iter()
869                .zip_eq_fast(fields)
870                .map(|(array, field)| self.from_array(field, array).map(Arc::new))
871                .try_collect()?,
872            (0..array.len()).map(|i| array.is_valid(i)).collect(),
873        )))
874    }
875
876    fn from_map_array(&self, array: &arrow_array::MapArray) -> Result<ArrayImpl, ArrayError> {
877        use arrow_array::Array;
878        let struct_array = self.from_struct_array(array.entries())?;
879        let list_array = ListArray {
880            value: Box::new(struct_array),
881            bitmap: match array.nulls() {
882                Some(nulls) => nulls.iter().collect(),
883                None => Bitmap::ones(array.len()),
884            },
885            offsets: array.offsets().iter().map(|o| *o as u32).collect(),
886        };
887
888        Ok(ArrayImpl::Map(MapArray { inner: list_array }))
889    }
890}
891
892impl From<&Bitmap> for arrow_buffer::NullBuffer {
893    fn from(bitmap: &Bitmap) -> Self {
894        bitmap.iter().collect()
895    }
896}
897
898/// Implement bi-directional `From` between concrete array types.
899macro_rules! converts {
900    ($ArrayType:ty, $ArrowType:ty) => {
901        impl From<&$ArrayType> for $ArrowType {
902            fn from(array: &$ArrayType) -> Self {
903                array.iter().collect()
904            }
905        }
906        impl From<&$ArrowType> for $ArrayType {
907            fn from(array: &$ArrowType) -> Self {
908                array.iter().collect()
909            }
910        }
911        impl From<&[$ArrowType]> for $ArrayType {
912            fn from(arrays: &[$ArrowType]) -> Self {
913                arrays.iter().flat_map(|a| a.iter()).collect()
914            }
915        }
916    };
917    // convert values using FromIntoArrow
918    ($ArrayType:ty, $ArrowType:ty, @map) => {
919        impl From<&$ArrayType> for $ArrowType {
920            fn from(array: &$ArrayType) -> Self {
921                array.iter().map(|o| o.map(|v| v.into_arrow())).collect()
922            }
923        }
924        impl From<&$ArrowType> for $ArrayType {
925            fn from(array: &$ArrowType) -> Self {
926                array
927                    .iter()
928                    .map(|o| {
929                        o.map(|v| {
930                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
931                        })
932                    })
933                    .collect()
934            }
935        }
936        impl From<&[$ArrowType]> for $ArrayType {
937            fn from(arrays: &[$ArrowType]) -> Self {
938                arrays
939                    .iter()
940                    .flat_map(|a| a.iter())
941                    .map(|o| {
942                        o.map(|v| {
943                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
944                        })
945                    })
946                    .collect()
947            }
948        }
949    };
950}
951
952/// Used to convert different types.
953macro_rules! converts_with_type {
954    ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => {
955        impl From<&$ArrayType> for $ArrowType {
956            fn from(array: &$ArrayType) -> Self {
957                let values: Vec<Option<$ToType>> =
958                    array.iter().map(|x| x.map(|v| v as $ToType)).collect();
959                <$ArrowType>::from_iter(values)
960            }
961        }
962
963        impl From<&$ArrowType> for $ArrayType {
964            fn from(array: &$ArrowType) -> Self {
965                let values: Vec<Option<$FromType>> =
966                    array.iter().map(|x| x.map(|v| v as $FromType)).collect();
967                <$ArrayType>::from_iter(values)
968            }
969        }
970
971        impl From<&[$ArrowType]> for $ArrayType {
972            fn from(arrays: &[$ArrowType]) -> Self {
973                let values: Vec<Option<$FromType>> = arrays
974                    .iter()
975                    .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType)))
976                    .collect();
977                <$ArrayType>::from_iter(values)
978            }
979        }
980    };
981}
982
983macro_rules! converts_with_timeunit {
984    ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {
985
986        impl From<&$ArrayType> for $ArrowType {
987            fn from(array: &$ArrayType) -> Self {
988                array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
989            }
990        }
991
992        impl From<&$ArrowType> for $ArrayType {
993            fn from(array: &$ArrowType) -> Self {
994                array.iter().map(|o| {
995                    o.map(|v| {
996                        let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
997                        timestamp
998                    })
999                }).collect()
1000            }
1001        }
1002
1003        impl From<&[$ArrowType]> for $ArrayType {
1004            fn from(arrays: &[$ArrowType]) -> Self {
1005                arrays
1006                    .iter()
1007                    .flat_map(|a| a.iter())
1008                    .map(|o| {
1009                        o.map(|v| {
1010                            <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
1011                        })
1012                    })
1013                    .collect()
1014            }
1015        }
1016
1017    };
1018}
1019
1020converts!(BoolArray, arrow_array::BooleanArray);
1021converts!(I16Array, arrow_array::Int16Array);
1022converts!(I32Array, arrow_array::Int32Array);
1023converts!(I64Array, arrow_array::Int64Array);
1024converts!(F32Array, arrow_array::Float32Array, @map);
1025converts!(F64Array, arrow_array::Float64Array, @map);
1026converts!(BytesArray, arrow_array::BinaryArray);
1027converts!(BytesArray, arrow_array::LargeBinaryArray);
1028converts!(Utf8Array, arrow_array::StringArray);
1029converts!(Utf8Array, arrow_array::LargeStringArray);
1030converts!(DateArray, arrow_array::Date32Array, @map);
1031converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
1032converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
1033converts!(SerialArray, arrow_array::Int64Array, @map);
1034
1035converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8);
1036converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8);
1037converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16);
1038converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32);
1039
1040converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1041converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
1042converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1043converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1044
1045converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
1046converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
1047converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
1048converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
1049
1050/// Converts RisingWave value from and into Arrow value.
1051trait FromIntoArrow {
1052    /// The corresponding element type in the Arrow array.
1053    type ArrowType;
1054    fn from_arrow(value: Self::ArrowType) -> Self;
1055    fn into_arrow(self) -> Self::ArrowType;
1056}
1057
1058/// Converts RisingWave value from and into Arrow value.
1059/// Specifically used for converting timestamp types according to timeunit.
1060trait FromIntoArrowWithUnit {
1061    type ArrowType;
1062    /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
1063    type TimestampType;
1064    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
1065    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
1066}
1067
1068impl FromIntoArrow for Serial {
1069    type ArrowType = i64;
1070
1071    fn from_arrow(value: Self::ArrowType) -> Self {
1072        value.into()
1073    }
1074
1075    fn into_arrow(self) -> Self::ArrowType {
1076        self.into()
1077    }
1078}
1079
1080impl FromIntoArrow for F32 {
1081    type ArrowType = f32;
1082
1083    fn from_arrow(value: Self::ArrowType) -> Self {
1084        value.into()
1085    }
1086
1087    fn into_arrow(self) -> Self::ArrowType {
1088        self.into()
1089    }
1090}
1091
1092impl FromIntoArrow for F64 {
1093    type ArrowType = f64;
1094
1095    fn from_arrow(value: Self::ArrowType) -> Self {
1096        value.into()
1097    }
1098
1099    fn into_arrow(self) -> Self::ArrowType {
1100        self.into()
1101    }
1102}
1103
1104impl FromIntoArrow for Date {
1105    type ArrowType = i32;
1106
1107    fn from_arrow(value: Self::ArrowType) -> Self {
1108        Date(arrow_array::types::Date32Type::to_naive_date(value))
1109    }
1110
1111    fn into_arrow(self) -> Self::ArrowType {
1112        arrow_array::types::Date32Type::from_naive_date(self.0)
1113    }
1114}
1115
1116impl FromIntoArrow for Time {
1117    type ArrowType = i64;
1118
1119    fn from_arrow(value: Self::ArrowType) -> Self {
1120        Time(
1121            NaiveTime::from_num_seconds_from_midnight_opt(
1122                (value / 1_000_000) as _,
1123                (value % 1_000_000 * 1000) as _,
1124            )
1125            .unwrap(),
1126        )
1127    }
1128
1129    fn into_arrow(self) -> Self::ArrowType {
1130        self.0
1131            .signed_duration_since(NaiveTime::default())
1132            .num_microseconds()
1133            .unwrap()
1134    }
1135}
1136
1137impl FromIntoArrowWithUnit for Timestamp {
1138    type ArrowType = i64;
1139    type TimestampType = TimeUnit;
1140
1141    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1142        match time_unit {
1143            TimeUnit::Second => {
1144                Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
1145            }
1146            TimeUnit::Millisecond => {
1147                Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
1148            }
1149            TimeUnit::Microsecond => {
1150                Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
1151            }
1152            TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
1153        }
1154    }
1155
1156    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1157        match time_unit {
1158            TimeUnit::Second => self.0.and_utc().timestamp(),
1159            TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
1160            TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
1161            TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
1162        }
1163    }
1164}
1165
1166impl FromIntoArrowWithUnit for Timestamptz {
1167    type ArrowType = i64;
1168    type TimestampType = TimeUnit;
1169
1170    fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
1171        match time_unit {
1172            TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
1173            TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
1174            TimeUnit::Microsecond => Timestamptz::from_micros(value),
1175            TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
1176        }
1177    }
1178
1179    fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
1180        match time_unit {
1181            TimeUnit::Second => self.timestamp(),
1182            TimeUnit::Millisecond => self.timestamp_millis(),
1183            TimeUnit::Microsecond => self.timestamp_micros(),
1184            TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
1185        }
1186    }
1187}
1188
1189impl FromIntoArrow for Interval {
1190    type ArrowType = ArrowIntervalType;
1191
1192    fn from_arrow(value: Self::ArrowType) -> Self {
1193        <ArrowIntervalType as crate::array::arrow::ArrowIntervalTypeTrait>::to_interval(value)
1194    }
1195
1196    fn into_arrow(self) -> Self::ArrowType {
1197        <ArrowIntervalType as crate::array::arrow::ArrowIntervalTypeTrait>::from_interval(self)
1198    }
1199}
1200
1201impl From<&DecimalArray> for arrow_array::LargeBinaryArray {
1202    fn from(array: &DecimalArray) -> Self {
1203        let mut builder =
1204            arrow_array::builder::LargeBinaryBuilder::with_capacity(array.len(), array.len() * 8);
1205        for value in array.iter() {
1206            builder.append_option(value.map(|d| d.to_string()));
1207        }
1208        builder.finish()
1209    }
1210}
1211
1212impl From<&DecimalArray> for arrow_array::StringArray {
1213    fn from(array: &DecimalArray) -> Self {
1214        let mut builder =
1215            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 8);
1216        for value in array.iter() {
1217            builder.append_option(value.map(|d| d.to_string()));
1218        }
1219        builder.finish()
1220    }
1221}
1222
1223// This arrow decimal type is used by iceberg source to read iceberg decimal into RW decimal.
1224impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray {
1225    type Error = ArrayError;
1226
1227    fn try_from(array: &arrow_array::Decimal128Array) -> Result<Self, Self::Error> {
1228        if array.scale() < 0 {
1229            bail!("support negative scale for arrow decimal")
1230        }
1231        let from_arrow = |value| {
1232            const NAN: i128 = i128::MIN + 1;
1233            let res = match value {
1234                NAN => Decimal::NaN,
1235                i128::MAX => Decimal::PositiveInf,
1236                i128::MIN => Decimal::NegativeInf,
1237                _ => Decimal::Normalized(
1238                    rust_decimal::Decimal::try_from_i128_with_scale(value, array.scale() as u32)
1239                        .map_err(ArrayError::internal)?,
1240                ),
1241            };
1242            Ok(res)
1243        };
1244        array
1245            .iter()
1246            .map(|o| o.map(from_arrow).transpose())
1247            .collect::<Result<Self, Self::Error>>()
1248    }
1249}
1250
1251// Since RisingWave does not support UInt type, convert UInt64Array to Decimal.
1252impl TryFrom<&arrow_array::UInt64Array> for DecimalArray {
1253    type Error = ArrayError;
1254
1255    fn try_from(array: &arrow_array::UInt64Array) -> Result<Self, Self::Error> {
1256        let from_arrow = |value| {
1257            // Convert the value to a Decimal with scale 0
1258            let res = Decimal::from(value);
1259            Ok(res)
1260        };
1261
1262        // Map over the array and convert each value
1263        array
1264            .iter()
1265            .map(|o| o.map(from_arrow).transpose())
1266            .collect::<Result<Self, Self::Error>>()
1267    }
1268}
1269
1270impl TryFrom<&arrow_array::Float16Array> for F32Array {
1271    type Error = ArrayError;
1272
1273    fn try_from(array: &arrow_array::Float16Array) -> Result<Self, Self::Error> {
1274        let from_arrow = |value| Ok(f32::from(value));
1275
1276        array
1277            .iter()
1278            .map(|o| o.map(from_arrow).transpose())
1279            .collect::<Result<Self, Self::Error>>()
1280    }
1281}
1282
1283impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray {
1284    type Error = ArrayError;
1285
1286    fn try_from(array: &arrow_array::LargeBinaryArray) -> Result<Self, Self::Error> {
1287        array
1288            .iter()
1289            .map(|o| {
1290                o.map(|s| {
1291                    let s = std::str::from_utf8(s)
1292                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))?;
1293                    s.parse()
1294                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1295                })
1296                .transpose()
1297            })
1298            .try_collect()
1299    }
1300}
1301
1302impl TryFrom<&arrow_array::StringArray> for DecimalArray {
1303    type Error = ArrayError;
1304
1305    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1306        array
1307            .iter()
1308            .map(|o| {
1309                o.map(|s| {
1310                    s.parse()
1311                        .map_err(|_| ArrayError::from_arrow(format!("invalid decimal: {s:?}")))
1312                })
1313                .transpose()
1314            })
1315            .try_collect()
1316    }
1317}
1318
1319impl From<&JsonbArray> for arrow_array::StringArray {
1320    fn from(array: &JsonbArray) -> Self {
1321        let mut builder =
1322            arrow_array::builder::StringBuilder::with_capacity(array.len(), array.len() * 16);
1323        for value in array.iter() {
1324            match value {
1325                Some(jsonb) => {
1326                    write!(&mut builder, "{}", jsonb).unwrap();
1327                    builder.append_value("");
1328                }
1329                None => builder.append_null(),
1330            }
1331        }
1332        builder.finish()
1333    }
1334}
1335
1336impl TryFrom<&arrow_array::StringArray> for JsonbArray {
1337    type Error = ArrayError;
1338
1339    fn try_from(array: &arrow_array::StringArray) -> Result<Self, Self::Error> {
1340        array
1341            .iter()
1342            .map(|o| {
1343                o.map(|s| {
1344                    s.parse()
1345                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1346                })
1347                .transpose()
1348            })
1349            .try_collect()
1350    }
1351}
1352
1353impl From<&JsonbArray> for arrow_array::LargeStringArray {
1354    fn from(array: &JsonbArray) -> Self {
1355        let mut builder =
1356            arrow_array::builder::LargeStringBuilder::with_capacity(array.len(), array.len() * 16);
1357        for value in array.iter() {
1358            match value {
1359                Some(jsonb) => {
1360                    write!(&mut builder, "{}", jsonb).unwrap();
1361                    builder.append_value("");
1362                }
1363                None => builder.append_null(),
1364            }
1365        }
1366        builder.finish()
1367    }
1368}
1369
1370impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
1371    type Error = ArrayError;
1372
1373    fn try_from(array: &arrow_array::LargeStringArray) -> Result<Self, Self::Error> {
1374        array
1375            .iter()
1376            .map(|o| {
1377                o.map(|s| {
1378                    s.parse()
1379                        .map_err(|_| ArrayError::from_arrow(format!("invalid json: {s}")))
1380                })
1381                .transpose()
1382            })
1383            .try_collect()
1384    }
1385}
1386
1387impl From<arrow_buffer::i256> for Int256 {
1388    fn from(value: arrow_buffer::i256) -> Self {
1389        let buffer = value.to_be_bytes();
1390        Int256::from_be_bytes(buffer)
1391    }
1392}
1393
1394impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
1395    fn from(val: Int256Ref<'a>) -> Self {
1396        let buffer = val.to_be_bytes();
1397        arrow_buffer::i256::from_be_bytes(buffer)
1398    }
1399}
1400
1401impl From<&Int256Array> for arrow_array::Decimal256Array {
1402    fn from(array: &Int256Array) -> Self {
1403        array
1404            .iter()
1405            .map(|o| o.map(arrow_buffer::i256::from))
1406            .collect()
1407    }
1408}
1409
1410impl From<&arrow_array::Decimal256Array> for Int256Array {
1411    fn from(array: &arrow_array::Decimal256Array) -> Self {
1412        let values = array.iter().map(|o| o.map(Int256::from)).collect_vec();
1413
1414        values
1415            .iter()
1416            .map(|i| i.as_ref().map(|v| v.as_scalar_ref()))
1417            .collect()
1418    }
1419}
1420
1421/// This function checks whether the schema of a Parquet file matches the user defined schema.
1422/// It handles the following special cases:
1423/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `TimeStamp` type.
1424/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `TimeStamptz` type.
1425/// - Since RisingWave does not have an `UInt` type:
1426///   - Arrow's `UInt8` matches with RisingWave's `Int16`.
1427///   - Arrow's `UInt16` matches with RisingWave's `Int32`.
1428///   - Arrow's `UInt32` matches with RisingWave's `Int64`.
1429///   - Arrow's `UInt64` matches with RisingWave's `Decimal`.
1430/// - Arrow's `Float16` matches with RisingWave's `Float32`.
1431pub fn is_parquet_schema_match_source_schema(
1432    arrow_data_type: &arrow_schema::DataType,
1433    rw_data_type: &crate::types::DataType,
1434) -> bool {
1435    matches!(
1436        (arrow_data_type, rw_data_type),
1437        (arrow_schema::DataType::Boolean, RwDataType::Boolean)
1438            | (
1439                arrow_schema::DataType::Int8
1440                    | arrow_schema::DataType::Int16
1441                    | arrow_schema::DataType::UInt8,
1442                RwDataType::Int16
1443            )
1444            | (
1445                arrow_schema::DataType::Int32 | arrow_schema::DataType::UInt16,
1446                RwDataType::Int32
1447            )
1448            | (
1449                arrow_schema::DataType::Int64 | arrow_schema::DataType::UInt32,
1450                RwDataType::Int64
1451            )
1452            | (
1453                arrow_schema::DataType::UInt64 | arrow_schema::DataType::Decimal128(_, _),
1454                RwDataType::Decimal
1455            )
1456            | (arrow_schema::DataType::Decimal256(_, _), RwDataType::Int256)
1457            | (
1458                arrow_schema::DataType::Float16 | arrow_schema::DataType::Float32,
1459                RwDataType::Float32
1460            )
1461            | (arrow_schema::DataType::Float64, RwDataType::Float64)
1462            | (
1463                arrow_schema::DataType::Timestamp(_, None),
1464                RwDataType::Timestamp
1465            )
1466            | (
1467                arrow_schema::DataType::Timestamp(_, Some(_)),
1468                RwDataType::Timestamptz
1469            )
1470            | (arrow_schema::DataType::Date32, RwDataType::Date)
1471            | (
1472                arrow_schema::DataType::Time32(_) | arrow_schema::DataType::Time64(_),
1473                RwDataType::Time
1474            )
1475            | (
1476                arrow_schema::DataType::Interval(IntervalUnit::MonthDayNano),
1477                RwDataType::Interval
1478            )
1479            | (
1480                arrow_schema::DataType::Utf8 | arrow_schema::DataType::LargeUtf8,
1481                RwDataType::Varchar
1482            )
1483            | (
1484                arrow_schema::DataType::Binary | arrow_schema::DataType::LargeBinary,
1485                RwDataType::Bytea
1486            )
1487            | (arrow_schema::DataType::List(_), RwDataType::List(_))
1488            | (arrow_schema::DataType::Struct(_), RwDataType::Struct(_))
1489            | (arrow_schema::DataType::Map(_, _), RwDataType::Map(_))
1490    )
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495
1496    use super::*;
1497
1498    #[test]
1499    fn bool() {
1500        let array = BoolArray::from_iter([None, Some(false), Some(true)]);
1501        let arrow = arrow_array::BooleanArray::from(&array);
1502        assert_eq!(BoolArray::from(&arrow), array);
1503    }
1504
1505    #[test]
1506    fn i16() {
1507        let array = I16Array::from_iter([None, Some(-7), Some(25)]);
1508        let arrow = arrow_array::Int16Array::from(&array);
1509        assert_eq!(I16Array::from(&arrow), array);
1510    }
1511
1512    #[test]
1513    fn i32() {
1514        let array = I32Array::from_iter([None, Some(-7), Some(25)]);
1515        let arrow = arrow_array::Int32Array::from(&array);
1516        assert_eq!(I32Array::from(&arrow), array);
1517    }
1518
1519    #[test]
1520    fn i64() {
1521        let array = I64Array::from_iter([None, Some(-7), Some(25)]);
1522        let arrow = arrow_array::Int64Array::from(&array);
1523        assert_eq!(I64Array::from(&arrow), array);
1524    }
1525
1526    #[test]
1527    fn f32() {
1528        let array = F32Array::from_iter([None, Some(-7.0), Some(25.0)]);
1529        let arrow = arrow_array::Float32Array::from(&array);
1530        assert_eq!(F32Array::from(&arrow), array);
1531    }
1532
1533    #[test]
1534    fn f64() {
1535        let array = F64Array::from_iter([None, Some(-7.0), Some(25.0)]);
1536        let arrow = arrow_array::Float64Array::from(&array);
1537        assert_eq!(F64Array::from(&arrow), array);
1538    }
1539
1540    #[test]
1541    fn int8() {
1542        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(-128), Some(127)]);
1543        let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]);
1544        let converted: PrimitiveArray<i16> = (&arr).into();
1545        assert_eq!(converted, array);
1546    }
1547
1548    #[test]
1549    fn uint8() {
1550        let array: PrimitiveArray<i16> = I16Array::from_iter([None, Some(7), Some(25)]);
1551        let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]);
1552        let converted: PrimitiveArray<i16> = (&arr).into();
1553        assert_eq!(converted, array);
1554    }
1555
1556    #[test]
1557    fn uint16() {
1558        let array: PrimitiveArray<i32> = I32Array::from_iter([None, Some(7), Some(65535)]);
1559        let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]);
1560        let converted: PrimitiveArray<i32> = (&arr).into();
1561        assert_eq!(converted, array);
1562    }
1563
1564    #[test]
1565    fn uint32() {
1566        let array: PrimitiveArray<i64> = I64Array::from_iter([None, Some(7), Some(4294967295)]);
1567        let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]);
1568        let converted: PrimitiveArray<i64> = (&arr).into();
1569        assert_eq!(converted, array);
1570    }
1571
1572    #[test]
1573    fn uint64() {
1574        let array: PrimitiveArray<Decimal> = DecimalArray::from_iter([
1575            None,
1576            Some(Decimal::Normalized("7".parse().unwrap())),
1577            Some(Decimal::Normalized("18446744073709551615".parse().unwrap())),
1578        ]);
1579        let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]);
1580        let converted: PrimitiveArray<Decimal> = (&arr).try_into().unwrap();
1581        assert_eq!(converted, array);
1582    }
1583
1584    #[test]
1585    fn date() {
1586        let array = DateArray::from_iter([
1587            None,
1588            Date::with_days_since_ce(12345).ok(),
1589            Date::with_days_since_ce(-12345).ok(),
1590        ]);
1591        let arrow = arrow_array::Date32Array::from(&array);
1592        assert_eq!(DateArray::from(&arrow), array);
1593    }
1594
1595    #[test]
1596    fn time() {
1597        let array = TimeArray::from_iter([None, Time::with_micro(24 * 3600 * 1_000_000 - 1).ok()]);
1598        let arrow = arrow_array::Time64MicrosecondArray::from(&array);
1599        assert_eq!(TimeArray::from(&arrow), array);
1600    }
1601
1602    #[test]
1603    fn timestamp() {
1604        let array =
1605            TimestampArray::from_iter([None, Timestamp::with_micros(123456789012345678).ok()]);
1606        let arrow = arrow_array::TimestampMicrosecondArray::from(&array);
1607        assert_eq!(TimestampArray::from(&arrow), array);
1608    }
1609
1610    #[test]
1611    fn interval() {
1612        let array = IntervalArray::from_iter([
1613            None,
1614            Some(Interval::from_month_day_usec(
1615                1_000_000,
1616                1_000,
1617                1_000_000_000,
1618            )),
1619            Some(Interval::from_month_day_usec(
1620                -1_000_000,
1621                -1_000,
1622                -1_000_000_000,
1623            )),
1624        ]);
1625        let arrow = arrow_array::IntervalMonthDayNanoArray::from(&array);
1626        assert_eq!(IntervalArray::from(&arrow), array);
1627    }
1628
1629    #[test]
1630    fn string() {
1631        let array = Utf8Array::from_iter([None, Some("array"), Some("arrow")]);
1632        let arrow = arrow_array::StringArray::from(&array);
1633        assert_eq!(Utf8Array::from(&arrow), array);
1634    }
1635
1636    #[test]
1637    fn binary() {
1638        let array = BytesArray::from_iter([None, Some("array".as_bytes())]);
1639        let arrow = arrow_array::BinaryArray::from(&array);
1640        assert_eq!(BytesArray::from(&arrow), array);
1641    }
1642
1643    #[test]
1644    fn decimal() {
1645        let array = DecimalArray::from_iter([
1646            None,
1647            Some(Decimal::NaN),
1648            Some(Decimal::PositiveInf),
1649            Some(Decimal::NegativeInf),
1650            Some(Decimal::Normalized("123.4".parse().unwrap())),
1651            Some(Decimal::Normalized("123.456".parse().unwrap())),
1652        ]);
1653        let arrow = arrow_array::LargeBinaryArray::from(&array);
1654        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1655
1656        let arrow = arrow_array::StringArray::from(&array);
1657        assert_eq!(DecimalArray::try_from(&arrow).unwrap(), array);
1658    }
1659
1660    #[test]
1661    fn jsonb() {
1662        let array = JsonbArray::from_iter([
1663            None,
1664            Some("null".parse().unwrap()),
1665            Some("false".parse().unwrap()),
1666            Some("1".parse().unwrap()),
1667            Some("[1, 2, 3]".parse().unwrap()),
1668            Some(r#"{ "a": 1, "b": null }"#.parse().unwrap()),
1669        ]);
1670        let arrow = arrow_array::LargeStringArray::from(&array);
1671        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1672
1673        let arrow = arrow_array::StringArray::from(&array);
1674        assert_eq!(JsonbArray::try_from(&arrow).unwrap(), array);
1675    }
1676
1677    #[test]
1678    fn int256() {
1679        let values = [
1680            None,
1681            Some(Int256::from(1)),
1682            Some(Int256::from(i64::MAX)),
1683            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1684            Some(Int256::from(i64::MAX) * Int256::from(i64::MAX) * Int256::from(i64::MAX)),
1685            Some(
1686                Int256::from(i64::MAX)
1687                    * Int256::from(i64::MAX)
1688                    * Int256::from(i64::MAX)
1689                    * Int256::from(i64::MAX),
1690            ),
1691            Some(Int256::min_value()),
1692            Some(Int256::max_value()),
1693        ];
1694
1695        let array =
1696            Int256Array::from_iter(values.iter().map(|r| r.as_ref().map(|x| x.as_scalar_ref())));
1697        let arrow = arrow_array::Decimal256Array::from(&array);
1698        assert_eq!(Int256Array::from(&arrow), array);
1699    }
1700}