risingwave_expr_impl/scalar/external/
iceberg.rs
1use std::fmt::Formatter;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use iceberg::spec::{PrimitiveType, Transform, Type as IcebergType};
23use iceberg::transform::{BoxedTransformFunction, create_transform_function};
24use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
25use risingwave_common::array::{ArrayRef, DataChunk};
26use risingwave_common::ensure;
27use risingwave_common::row::OwnedRow;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_expr::expr::BoxedExpression;
30use risingwave_expr::{ExprError, Result, build_function};
31use thiserror_ext::AsReport;
32
33pub struct IcebergTransform {
34 child: BoxedExpression,
35 transform: BoxedTransformFunction,
36 input_arrow_type: arrow_schema_iceberg::DataType,
37 output_arrow_field: arrow_schema_iceberg::Field,
38 return_type: DataType,
39}
40
41impl std::fmt::Debug for IcebergTransform {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("IcebergTransform")
44 .field("child", &self.child)
45 .field("return_type", &self.return_type)
46 .finish()
47 }
48}
49
50#[async_trait::async_trait]
51impl risingwave_expr::expr::Expression for IcebergTransform {
52 fn return_type(&self) -> DataType {
53 self.return_type.clone()
54 }
55
56 async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> {
57 let array = self.child.eval(data_chunk).await?;
59 let arrow_array = IcebergArrowConvert.to_arrow_array(&self.input_arrow_type, &array)?;
61 let res_array = self.transform.transform(arrow_array).unwrap();
63 Ok(Arc::new(IcebergArrowConvert.array_from_arrow_array(
65 &self.output_arrow_field,
66 &res_array,
67 )?))
68 }
69
70 async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> {
71 Err(ExprError::Internal(anyhow!(
72 "eval_row in iceberg_transform is not supported yet"
73 )))
74 }
75}
76
77#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "unreachable")]
78fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
79 let transform_type = {
80 let datum = children[0].eval_const()?.unwrap();
81 let str = datum.as_utf8();
82 Transform::from_str(str).map_err(|_| ExprError::InvalidParam {
83 name: "transform type in iceberg_transform",
84 reason: format!("Fail to parse {str} as iceberg transform type").into(),
85 })?
86 };
87
88 assert!(!matches!(
91 transform_type,
92 Transform::Identity | Transform::Void
93 ));
94
95 let input_arrow_type = IcebergArrowConvert
99 .to_arrow_field("", &children[1].return_type())?
100 .data_type()
101 .clone();
102 let output_arrow_field = IcebergArrowConvert.to_arrow_field("", &return_type)?;
103 let input_type = iceberg::arrow::arrow_type_to_type(&input_arrow_type).map_err(|err| {
104 ExprError::InvalidParam {
105 name: "input type in iceberg_transform",
106 reason: format!(
107 "Failed to convert input type to iceberg type, got error: {}",
108 err.as_report()
109 )
110 .into(),
111 }
112 })?;
113 let expect_res_type = transform_type.result_type(&input_type).map_err(
114 |err| ExprError::InvalidParam {
115 name: "input type in iceberg_transform",
116 reason: format!(
117 "Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
118 transform_type, input_type, err.as_report()
119 )
120 .into()
121 })?;
122 let actual_res_type = iceberg::arrow::arrow_type_to_type(
123 &IcebergArrowConvert
124 .to_arrow_field("", &return_type)?
125 .data_type()
126 .clone(),
127 )
128 .map_err(|err| ExprError::InvalidParam {
129 name: "return type in iceberg_transform",
130 reason: format!(
131 "Failed to convert return type to iceberg type, got error: {}",
132 err.as_report()
133 )
134 .into(),
135 })?;
136
137 ensure!(
138 (expect_res_type == actual_res_type)
139 ||
140 (matches!(transform_type, Transform::Day) && matches!(actual_res_type, IcebergType::Primitive(PrimitiveType::Int))),
142 ExprError::InvalidParam {
143 name: "return type in iceberg_transform",
144 reason: format!(
145 "Expect return type {:?} but got {:?}, RisingWave return type is {:?}, input type is {:?}, transform type is {:?}",
146 expect_res_type,
147 actual_res_type,
148 return_type,
149 (input_type, input_arrow_type),
150 transform_type
151 )
152 .into()
153 }
154 );
155
156 Ok(Box::new(IcebergTransform {
157 child: children.remove(1),
158 transform: create_transform_function(&transform_type)
159 .map_err(|err| ExprError::Internal(err.into()))?,
160 input_arrow_type,
161 output_arrow_field,
162 return_type,
163 }))
164}
165
166#[cfg(test)]
167mod test {
168 use risingwave_common::array::{DataChunk, DataChunkTestExt};
169 use risingwave_expr::expr::build_from_pretty;
170
171 #[tokio::test]
172 async fn test_bucket() {
173 let (input, expected) = DataChunk::from_pretty(
174 "i i
175 34 1373",
176 )
177 .split_column_at(1);
178 let expr = build_from_pretty("(iceberg_transform:int4 bucket[2017]:varchar $0:int)");
179 let res = expr.eval(&input).await.unwrap();
180 assert_eq!(res, *expected.column_at(0));
181 }
182
183 #[tokio::test]
184 async fn test_truncate() {
185 let (input, expected) = DataChunk::from_pretty(
186 "T T
187 iceberg ice
188 risingwave ris
189 delta del",
190 )
191 .split_column_at(1);
192 let expr = build_from_pretty("(iceberg_transform:varchar truncate[3]:varchar $0:varchar)");
193 let res = expr.eval(&input).await.unwrap();
194 assert_eq!(res, *expected.column_at(0));
195 }
196
197 #[tokio::test]
198 async fn test_year_month_day_hour() {
199 let (input, expected) = DataChunk::from_pretty(
200 "TZ i i D i
201 1970-01-01T00:00:00.000000000+00:00 0 0 1970-01-01 0
202 1971-02-01T01:00:00.000000000+00:00 1 13 1971-02-01 9505
203 1972-03-01T02:00:00.000000000+00:00 2 26 1972-03-01 18962
204 1970-05-01T06:00:00.000000000+00:00 0 4 1970-05-01 2886
205 1970-06-01T07:00:00.000000000+00:00 0 5 1970-06-01 3631",
206 )
207 .split_column_at(1);
208
209 let expr = build_from_pretty("(iceberg_transform:int4 year:varchar $0:timestamptz)");
211 let res = expr.eval(&input).await.unwrap();
212 assert_eq!(res, *expected.column_at(0));
213
214 let expr = build_from_pretty("(iceberg_transform:int4 month:varchar $0:timestamptz)");
216 let res = expr.eval(&input).await.unwrap();
217 assert_eq!(res, *expected.column_at(1));
218
219 let expr = build_from_pretty("(iceberg_transform:int4 day:varchar $0:timestamptz)");
221 let res = expr.eval(&input).await.unwrap();
222 assert_eq!(res, *expected.column_at(2));
223
224 let expr = build_from_pretty("(iceberg_transform:int4 hour:varchar $0:timestamptz)");
226 let res = expr.eval(&input).await.unwrap();
227 assert_eq!(res, *expected.column_at(3));
228 }
229}