risingwave_expr_impl/scalar/external/
iceberg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains the expression for computing the iceberg partition value.
16//! spec ref: <https://iceberg.apache.org/spec/#partition-transforms>
17use std::fmt::Formatter;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use iceberg::spec::{PrimitiveType, Transform, Type as IcebergType};
23use iceberg::transform::{BoxedTransformFunction, create_transform_function};
24use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
25use risingwave_common::array::{ArrayRef, DataChunk};
26use risingwave_common::ensure;
27use risingwave_common::row::OwnedRow;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_expr::expr::BoxedExpression;
30use risingwave_expr::{ExprError, Result, build_function};
31use thiserror_ext::AsReport;
32
33pub struct IcebergTransform {
34    child: BoxedExpression,
35    transform: BoxedTransformFunction,
36    input_arrow_type: arrow_schema_iceberg::DataType,
37    output_arrow_field: arrow_schema_iceberg::Field,
38    return_type: DataType,
39}
40
41impl std::fmt::Debug for IcebergTransform {
42    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("IcebergTransform")
44            .field("child", &self.child)
45            .field("return_type", &self.return_type)
46            .finish()
47    }
48}
49
50#[async_trait::async_trait]
51impl risingwave_expr::expr::Expression for IcebergTransform {
52    fn return_type(&self) -> DataType {
53        self.return_type.clone()
54    }
55
56    async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> {
57        // Get the child array
58        let array = self.child.eval(data_chunk).await?;
59        // Convert to arrow array
60        let arrow_array = IcebergArrowConvert.to_arrow_array(&self.input_arrow_type, &array)?;
61        // Transform
62        let res_array = self.transform.transform(arrow_array).unwrap();
63        // Convert back to array ref and return it
64        Ok(Arc::new(IcebergArrowConvert.array_from_arrow_array(
65            &self.output_arrow_field,
66            &res_array,
67        )?))
68    }
69
70    async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> {
71        Err(ExprError::Internal(anyhow!(
72            "eval_row in iceberg_transform is not supported yet"
73        )))
74    }
75}
76
77#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "unreachable")]
78fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
79    let transform_type = {
80        let datum = children[0].eval_const()?.unwrap();
81        let str = datum.as_utf8();
82        Transform::from_str(str).map_err(|_| ExprError::InvalidParam {
83            name: "transform type in iceberg_transform",
84            reason: format!("Fail to parse {str} as iceberg transform type").into(),
85        })?
86    };
87
88    // For Identity and Void transform, we will use `InputRef` and const null in frontend,
89    // so it should not reach here.
90    assert!(!matches!(
91        transform_type,
92        Transform::Identity | Transform::Void
93    ));
94
95    // Check type:
96    // 1. input type can be transform successfully
97    // 2. return type is the same as the result type
98    let input_arrow_type = IcebergArrowConvert
99        .to_arrow_field("", &children[1].return_type())?
100        .data_type()
101        .clone();
102    let output_arrow_field = IcebergArrowConvert.to_arrow_field("", &return_type)?;
103    let input_type = iceberg::arrow::arrow_type_to_type(&input_arrow_type).map_err(|err| {
104        ExprError::InvalidParam {
105            name: "input type in iceberg_transform",
106            reason: format!(
107                "Failed to convert input type to iceberg type, got error: {}",
108                err.as_report()
109            )
110            .into(),
111        }
112    })?;
113    let expect_res_type = transform_type.result_type(&input_type).map_err(
114        |err| ExprError::InvalidParam {
115            name: "input type in iceberg_transform",
116            reason: format!(
117                "Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
118                transform_type, input_type, err.as_report()
119            )
120            .into()
121        })?;
122    let actual_res_type = iceberg::arrow::arrow_type_to_type(
123        &IcebergArrowConvert
124            .to_arrow_field("", &return_type)?
125            .data_type()
126            .clone(),
127    )
128    .map_err(|err| ExprError::InvalidParam {
129        name: "return type in iceberg_transform",
130        reason: format!(
131            "Failed to convert return type to iceberg type, got error: {}",
132            err.as_report()
133        )
134        .into(),
135    })?;
136
137    ensure!(
138        (expect_res_type == actual_res_type)
139            ||
140            // This is a confusing stuff.<https://github.com/apache/iceberg/pull/11749>
141            (matches!(transform_type, Transform::Day) && matches!(actual_res_type, IcebergType::Primitive(PrimitiveType::Int))),
142        ExprError::InvalidParam {
143            name: "return type in iceberg_transform",
144            reason: format!(
145                "Expect return type {:?} but got {:?}, RisingWave return type is {:?}, input type is {:?}, transform type is {:?}",
146                expect_res_type,
147                actual_res_type,
148                return_type,
149                (input_type, input_arrow_type),
150                transform_type
151            )
152            .into()
153        }
154    );
155
156    Ok(Box::new(IcebergTransform {
157        child: children.remove(1),
158        transform: create_transform_function(&transform_type)
159            .map_err(|err| ExprError::Internal(err.into()))?,
160        input_arrow_type,
161        output_arrow_field,
162        return_type,
163    }))
164}
165
166#[cfg(test)]
167mod test {
168    use risingwave_common::array::{DataChunk, DataChunkTestExt};
169    use risingwave_expr::expr::build_from_pretty;
170
171    #[tokio::test]
172    async fn test_bucket() {
173        let (input, expected) = DataChunk::from_pretty(
174            "i   i
175             34  1373",
176        )
177        .split_column_at(1);
178        let expr = build_from_pretty("(iceberg_transform:int4 bucket[2017]:varchar $0:int)");
179        let res = expr.eval(&input).await.unwrap();
180        assert_eq!(res, *expected.column_at(0));
181    }
182
183    #[tokio::test]
184    async fn test_truncate() {
185        let (input, expected) = DataChunk::from_pretty(
186            "T         T
187            iceberg   ice
188            risingwave ris
189            delta     del",
190        )
191        .split_column_at(1);
192        let expr = build_from_pretty("(iceberg_transform:varchar truncate[3]:varchar $0:varchar)");
193        let res = expr.eval(&input).await.unwrap();
194        assert_eq!(res, *expected.column_at(0));
195    }
196
197    #[tokio::test]
198    async fn test_year_month_day_hour() {
199        let (input, expected) = DataChunk::from_pretty(
200            "TZ                                  i i D i
201            1970-01-01T00:00:00.000000000+00:00  0 0 1970-01-01 0
202            1971-02-01T01:00:00.000000000+00:00  1 13 1971-02-01 9505
203            1972-03-01T02:00:00.000000000+00:00  2 26 1972-03-01 18962
204            1970-05-01T06:00:00.000000000+00:00  0 4 1970-05-01 2886
205            1970-06-01T07:00:00.000000000+00:00  0 5 1970-06-01 3631",
206        )
207        .split_column_at(1);
208
209        // year
210        let expr = build_from_pretty("(iceberg_transform:int4 year:varchar $0:timestamptz)");
211        let res = expr.eval(&input).await.unwrap();
212        assert_eq!(res, *expected.column_at(0));
213
214        // month
215        let expr = build_from_pretty("(iceberg_transform:int4 month:varchar $0:timestamptz)");
216        let res = expr.eval(&input).await.unwrap();
217        assert_eq!(res, *expected.column_at(1));
218
219        // day
220        let expr = build_from_pretty("(iceberg_transform:int4 day:varchar $0:timestamptz)");
221        let res = expr.eval(&input).await.unwrap();
222        assert_eq!(res, *expected.column_at(2));
223
224        // hour
225        let expr = build_from_pretty("(iceberg_transform:int4 hour:varchar $0:timestamptz)");
226        let res = expr.eval(&input).await.unwrap();
227        assert_eq!(res, *expected.column_at(3));
228    }
229}