risingwave_common/array/arrow/
arrow_deltalake.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This is for arrow dependency named `arrow-xxx-deltalake` such as `arrow-array-deltalake`
16//! in the cargo workspace.
17//!
18//! The corresponding version of arrow is currently used by `deltalake` sink.
19
20use std::ops::{Div, Mul};
21use std::sync::Arc;
22
23use arrow_array::ArrayRef;
24use num_traits::abs;
25
26pub use super::arrow_52::{
27    FromArrow, ToArrow, arrow_array, arrow_buffer, arrow_cast, arrow_schema,
28};
29use crate::array::{Array, ArrayError, DataChunk, Decimal, DecimalArray};
30
31pub struct DeltaLakeConvert;
32
33impl DeltaLakeConvert {
34    pub fn to_record_batch(
35        &self,
36        schema: arrow_schema::SchemaRef,
37        chunk: &DataChunk,
38    ) -> Result<arrow_array::RecordBatch, ArrayError> {
39        ToArrow::to_record_batch(self, schema, chunk)
40    }
41
42    fn decimal_to_i128(decimal: Decimal, precision: u8, max_scale: i8) -> Option<i128> {
43        match decimal {
44            crate::array::Decimal::Normalized(e) => {
45                let value = e.mantissa();
46                let scale = e.scale() as i8;
47                let diff_scale = abs(max_scale - scale);
48                let value = match scale {
49                    _ if scale < max_scale => value.mul(10_i32.pow(diff_scale as u32) as i128),
50                    _ if scale > max_scale => value.div(10_i32.pow(diff_scale as u32) as i128),
51                    _ => value,
52                };
53                Some(value)
54            }
55            // For Inf, we replace them with the max/min value within the precision.
56            crate::array::Decimal::PositiveInf => {
57                let max_value = 10_i128.pow(precision as u32) - 1;
58                Some(max_value)
59            }
60            crate::array::Decimal::NegativeInf => {
61                let max_value = 10_i128.pow(precision as u32) - 1;
62                Some(-max_value)
63            }
64            crate::array::Decimal::NaN => None,
65        }
66    }
67}
68
69impl ToArrow for DeltaLakeConvert {
70    fn decimal_to_arrow(
71        &self,
72        data_type: &arrow_schema::DataType,
73        array: &DecimalArray,
74    ) -> Result<arrow_array::ArrayRef, ArrayError> {
75        let (precision, max_scale) = match data_type {
76            arrow_schema::DataType::Decimal128(precision, scale) => (*precision, *scale),
77            _ => return Err(ArrayError::to_arrow("Invalid decimal type")),
78        };
79
80        // Convert Decimal to i128:
81        let values: Vec<Option<i128>> = array
82            .iter()
83            .map(|e| e.and_then(|e| DeltaLakeConvert::decimal_to_i128(e, precision, max_scale)))
84            .collect();
85
86        let array = arrow_array::Decimal128Array::from(values)
87            .with_precision_and_scale(precision, max_scale)
88            .map_err(ArrayError::from_arrow)?;
89        Ok(Arc::new(array) as ArrayRef)
90    }
91}
92
93#[cfg(test)]
94mod test {
95    use std::sync::Arc;
96
97    use arrow_array::ArrayRef;
98    use arrow_array::cast::AsArray;
99    use arrow_schema::Field;
100
101    use super::*;
102    use crate::array::arrow::arrow_deltalake::DeltaLakeConvert;
103    use crate::array::{ArrayImpl, Decimal, DecimalArray, ListArray, ListValue};
104    use crate::bitmap::Bitmap;
105
106    #[test]
107    fn test_decimal_list_chunk() {
108        let value = ListValue::new(crate::array::ArrayImpl::Decimal(DecimalArray::from_iter([
109            None,
110            Some(Decimal::NaN),
111            Some(Decimal::PositiveInf),
112            Some(Decimal::NegativeInf),
113            Some(Decimal::Normalized("1".parse().unwrap())),
114            Some(Decimal::Normalized("123.456".parse().unwrap())),
115        ])));
116        let array = Arc::new(ArrayImpl::List(ListArray::from_iter(vec![value])));
117        let chunk = crate::array::DataChunk::new(vec![array], Bitmap::ones(1));
118
119        let schema = arrow_schema::Schema::new(vec![Field::new(
120            "test",
121            arrow_schema::DataType::List(Arc::new(Field::new(
122                "test",
123                arrow_schema::DataType::Decimal128(10, 0),
124                true,
125            ))),
126            false,
127        )]);
128
129        let record_batch = DeltaLakeConvert
130            .to_record_batch(Arc::new(schema), &chunk)
131            .unwrap();
132        let expect_array = Arc::new(
133            arrow_array::Decimal128Array::from(vec![
134                None,
135                None,
136                Some(9999999999),
137                Some(-9999999999),
138                Some(1),
139                Some(123),
140            ])
141            .with_precision_and_scale(10, 0)
142            .unwrap(),
143        ) as ArrayRef;
144
145        assert_eq!(
146            &record_batch.column(0).as_list::<i32>().value(0),
147            &expect_array
148        );
149    }
150}