risingwave_common/array/arrow/
arrow_deltalake.rs1use std::ops::{Div, Mul};
21use std::sync::Arc;
22
23use arrow_array::ArrayRef;
24use num_traits::abs;
25
26pub use super::arrow_52::{
27 FromArrow, ToArrow, arrow_array, arrow_buffer, arrow_cast, arrow_schema,
28};
29use crate::array::{Array, ArrayError, DataChunk, Decimal, DecimalArray};
30
31pub struct DeltaLakeConvert;
32
33impl DeltaLakeConvert {
34 pub fn to_record_batch(
35 &self,
36 schema: arrow_schema::SchemaRef,
37 chunk: &DataChunk,
38 ) -> Result<arrow_array::RecordBatch, ArrayError> {
39 ToArrow::to_record_batch(self, schema, chunk)
40 }
41
42 fn decimal_to_i128(decimal: Decimal, precision: u8, max_scale: i8) -> Option<i128> {
43 match decimal {
44 crate::array::Decimal::Normalized(e) => {
45 let value = e.mantissa();
46 let scale = e.scale() as i8;
47 let diff_scale = abs(max_scale - scale);
48 let value = match scale {
49 _ if scale < max_scale => value.mul(10_i32.pow(diff_scale as u32) as i128),
50 _ if scale > max_scale => value.div(10_i32.pow(diff_scale as u32) as i128),
51 _ => value,
52 };
53 Some(value)
54 }
55 crate::array::Decimal::PositiveInf => {
57 let max_value = 10_i128.pow(precision as u32) - 1;
58 Some(max_value)
59 }
60 crate::array::Decimal::NegativeInf => {
61 let max_value = 10_i128.pow(precision as u32) - 1;
62 Some(-max_value)
63 }
64 crate::array::Decimal::NaN => None,
65 }
66 }
67}
68
69impl ToArrow for DeltaLakeConvert {
70 fn decimal_to_arrow(
71 &self,
72 data_type: &arrow_schema::DataType,
73 array: &DecimalArray,
74 ) -> Result<arrow_array::ArrayRef, ArrayError> {
75 let (precision, max_scale) = match data_type {
76 arrow_schema::DataType::Decimal128(precision, scale) => (*precision, *scale),
77 _ => return Err(ArrayError::to_arrow("Invalid decimal type")),
78 };
79
80 let values: Vec<Option<i128>> = array
82 .iter()
83 .map(|e| e.and_then(|e| DeltaLakeConvert::decimal_to_i128(e, precision, max_scale)))
84 .collect();
85
86 let array = arrow_array::Decimal128Array::from(values)
87 .with_precision_and_scale(precision, max_scale)
88 .map_err(ArrayError::from_arrow)?;
89 Ok(Arc::new(array) as ArrayRef)
90 }
91}
92
93#[cfg(test)]
94mod test {
95 use std::sync::Arc;
96
97 use arrow_array::ArrayRef;
98 use arrow_array::cast::AsArray;
99 use arrow_schema::Field;
100
101 use super::*;
102 use crate::array::arrow::arrow_deltalake::DeltaLakeConvert;
103 use crate::array::{ArrayImpl, Decimal, DecimalArray, ListArray, ListValue};
104 use crate::bitmap::Bitmap;
105
106 #[test]
107 fn test_decimal_list_chunk() {
108 let value = ListValue::new(crate::array::ArrayImpl::Decimal(DecimalArray::from_iter([
109 None,
110 Some(Decimal::NaN),
111 Some(Decimal::PositiveInf),
112 Some(Decimal::NegativeInf),
113 Some(Decimal::Normalized("1".parse().unwrap())),
114 Some(Decimal::Normalized("123.456".parse().unwrap())),
115 ])));
116 let array = Arc::new(ArrayImpl::List(ListArray::from_iter(vec![value])));
117 let chunk = crate::array::DataChunk::new(vec![array], Bitmap::ones(1));
118
119 let schema = arrow_schema::Schema::new(vec![Field::new(
120 "test",
121 arrow_schema::DataType::List(Arc::new(Field::new(
122 "test",
123 arrow_schema::DataType::Decimal128(10, 0),
124 true,
125 ))),
126 false,
127 )]);
128
129 let record_batch = DeltaLakeConvert
130 .to_record_batch(Arc::new(schema), &chunk)
131 .unwrap();
132 let expect_array = Arc::new(
133 arrow_array::Decimal128Array::from(vec![
134 None,
135 None,
136 Some(9999999999),
137 Some(-9999999999),
138 Some(1),
139 Some(123),
140 ])
141 .with_precision_and_scale(10, 0)
142 .unwrap(),
143 ) as ArrayRef;
144
145 assert_eq!(
146 &record_batch.column(0).as_list::<i32>().value(0),
147 &expect_array
148 );
149 }
150}