risingwave_connector/sink/iceberg/
create_table.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::str::FromStr;
17
18use anyhow::{Context, anyhow};
19use iceberg::arrow::schema_to_arrow_schema;
20use iceberg::spec::{TableProperties, Transform, UnboundPartitionField, UnboundPartitionSpec};
21use iceberg::table::Table;
22use iceberg::{NamespaceIdent, TableCreation};
23use regex::Regex;
24use risingwave_common::array::arrow::arrow_schema_iceberg::{
25    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
26    Schema as ArrowSchema,
27};
28use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
29use risingwave_common::bail;
30use risingwave_common::catalog::Schema;
31use url::Url;
32
33use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
34use crate::sink::{Result, SinkParam};
35
36pub(super) async fn create_and_validate_table_impl(
37    config: &IcebergConfig,
38    param: &SinkParam,
39) -> Result<Table> {
40    if config.create_table_if_not_exists {
41        create_table_if_not_exists_impl(config, param).await?;
42    }
43
44    let table = config
45        .load_table()
46        .await
47        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
48
49    let sink_schema = param.schema();
50    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
51        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
52
53    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
54        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
55
56    Ok(table)
57}
58
59pub(super) async fn create_table_if_not_exists_impl(
60    config: &IcebergConfig,
61    param: &SinkParam,
62) -> Result<()> {
63    let catalog = config.create_catalog().await?;
64    let namespace = if let Some(database_name) = config.table.database_name() {
65        let namespace = NamespaceIdent::new(database_name.to_owned());
66        if !catalog
67            .namespace_exists(&namespace)
68            .await
69            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
70        {
71            catalog
72                .create_namespace(&namespace, HashMap::default())
73                .await
74                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
75                .context("failed to create iceberg namespace")?;
76        }
77        namespace
78    } else {
79        bail!("database name must be set if you want to create table")
80    };
81
82    let table_id = config
83        .full_table_name()
84        .context("Unable to parse table name")?;
85    if !catalog
86        .table_exists(&table_id)
87        .await
88        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
89    {
90        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
91        // convert risingwave schema -> arrow schema -> iceberg schema
92        let arrow_fields = param
93            .columns
94            .iter()
95            .map(|column| {
96                Ok(iceberg_create_table_arrow_convert
97                    .to_arrow_field(&column.name, &column.data_type)
98                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
99                    .context(format!(
100                        "failed to convert {}: {} to arrow type",
101                        &column.name, &column.data_type
102                    ))?)
103            })
104            .collect::<Result<Vec<ArrowField>>>()?;
105        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
106        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
107            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
108            .context("failed to convert arrow schema to iceberg schema")?;
109
110        let location = {
111            let mut names = namespace.clone().inner();
112            names.push(config.table.table_name().to_owned());
113            match &config.common.warehouse_path {
114                Some(warehouse_path) => {
115                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
116                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
117                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
118                    let url = Url::parse(warehouse_path);
119                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
120                        // For rest catalog, the warehouse_path could be a warehouse name.
121                        // In this case, we should specify the location when creating a table.
122                        if config.common.catalog_type() == "rest"
123                            || config.common.catalog_type() == "rest_rust"
124                        {
125                            None
126                        } else {
127                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
128                        }
129                    } else if warehouse_path.ends_with('/') {
130                        Some(format!("{}{}", warehouse_path, names.join("/")))
131                    } else {
132                        Some(format!("{}/{}", warehouse_path, names.join("/")))
133                    }
134                }
135                None => None,
136            }
137        };
138
139        let partition_spec = match &config.partition_by {
140            Some(partition_by) => {
141                let mut partition_fields = Vec::<UnboundPartitionField>::new();
142                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
143                    .into_iter()
144                    .enumerate()
145                {
146                    match iceberg_schema.field_id_by_name(&column) {
147                        Some(id) => partition_fields.push(
148                            UnboundPartitionField::builder()
149                                .source_id(id)
150                                .transform(transform)
151                                .name(format!("_p_{}", column))
152                                .field_id(PARTITION_DATA_ID_START + i as i32)
153                                .build(),
154                        ),
155                        None => bail!(format!(
156                            "Partition source column does not exist in schema: {}",
157                            column
158                        )),
159                    };
160                }
161                Some(
162                    UnboundPartitionSpec::builder()
163                        .with_spec_id(0)
164                        .add_partition_fields(partition_fields)
165                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
166                        .context("failed to add partition columns")?
167                        .build(),
168                )
169            }
170            None => None,
171        };
172
173        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
174        let properties = HashMap::from([(
175            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
176            (config.format_version as u8).to_string(),
177        )]);
178
179        let table_creation_builder = TableCreation::builder()
180            .name(config.table.table_name().to_owned())
181            .schema(iceberg_schema)
182            .format_version(config.table_format_version())
183            .properties(properties);
184
185        let table_creation = match (location, partition_spec) {
186            (Some(location), Some(partition_spec)) => table_creation_builder
187                .location(location)
188                .partition_spec(partition_spec)
189                .build(),
190            (Some(location), None) => table_creation_builder.location(location).build(),
191            (None, Some(partition_spec)) => table_creation_builder
192                .partition_spec(partition_spec)
193                .build(),
194            (None, None) => table_creation_builder.build(),
195        };
196
197        catalog
198            .create_table(&namespace, table_creation)
199            .await
200            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
201            .context("failed to create iceberg table")?;
202    }
203    Ok(())
204}
205
206const MAP_KEY: &str = "key";
207const MAP_VALUE: &str = "value";
208
209fn get_fields<'a>(
210    our_field_type: &'a risingwave_common::types::DataType,
211    data_type: &ArrowDataType,
212    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
213) -> Option<ArrowFields> {
214    match data_type {
215        ArrowDataType::Struct(fields) => {
216            match our_field_type {
217                risingwave_common::types::DataType::Struct(struct_fields) => {
218                    struct_fields.iter().for_each(|(name, data_type)| {
219                        let res = schema_fields.insert(name, data_type);
220                        // This assert is to make sure there is no duplicate field name in the schema.
221                        assert!(res.is_none())
222                    });
223                }
224                risingwave_common::types::DataType::Map(map_fields) => {
225                    schema_fields.insert(MAP_KEY, map_fields.key());
226                    schema_fields.insert(MAP_VALUE, map_fields.value());
227                }
228                risingwave_common::types::DataType::List(list) => {
229                    list.elem()
230                        .as_struct()
231                        .iter()
232                        .for_each(|(name, data_type)| {
233                            let res = schema_fields.insert(name, data_type);
234                            // This assert is to make sure there is no duplicate field name in the schema.
235                            assert!(res.is_none())
236                        });
237                }
238                _ => {}
239            };
240            Some(fields.clone())
241        }
242        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
243            get_fields(our_field_type, field.data_type(), schema_fields)
244        }
245        _ => None, // not a supported complex type and unlikely to show up
246    }
247}
248
249fn check_compatibility(
250    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
251    fields: &ArrowFields,
252) -> anyhow::Result<bool> {
253    for arrow_field in fields {
254        let our_field_type = schema_fields
255            .get(arrow_field.name().as_str())
256            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
257
258        // Iceberg source should be able to read iceberg decimal type.
259        let converted_arrow_data_type = IcebergArrowConvert
260            .to_arrow_field("", our_field_type)
261            .map_err(|e| anyhow!(e))?
262            .data_type()
263            .clone();
264
265        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
266            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
267            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
268            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
269            (ArrowDataType::List(_), ArrowDataType::List(field))
270            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
271                let mut schema_fields = HashMap::new();
272                get_fields(our_field_type, field.data_type(), &mut schema_fields)
273                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
274            }
275            // validate nested structs
276            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
277                let mut schema_fields = HashMap::new();
278                our_field_type
279                    .as_struct()
280                    .iter()
281                    .for_each(|(name, data_type)| {
282                        let res = schema_fields.insert(name, data_type);
283                        // This assert is to make sure there is no duplicate field name in the schema.
284                        assert!(res.is_none())
285                    });
286                check_compatibility(schema_fields, fields)?
287            }
288            // cases where left != right (metadata, field name mismatch)
289            //
290            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
291            // {"PARQUET:field_id": ".."}
292            //
293            // map: The standard name in arrow is "entries", "key", "value".
294            // in iceberg-rs, it's called "key_value"
295            (left, right) => left.equals_datatype(right),
296        };
297        if !compatible {
298            bail!(
299                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
300                arrow_field.name(),
301                converted_arrow_data_type,
302                arrow_field.data_type()
303            );
304        }
305    }
306    Ok(true)
307}
308
309/// Try to match our schema with iceberg schema.
310pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
311    if rw_schema.fields.len() != arrow_schema.fields().len() {
312        bail!(
313            "Schema length mismatch, risingwave is {}, and iceberg is {}",
314            rw_schema.fields.len(),
315            arrow_schema.fields.len()
316        );
317    }
318
319    let mut schema_fields = HashMap::new();
320    rw_schema.fields.iter().for_each(|field| {
321        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
322        // This assert is to make sure there is no duplicate field name in the schema.
323        assert!(res.is_none())
324    });
325
326    check_compatibility(schema_fields, &arrow_schema.fields)?;
327    Ok(())
328}
329
330pub fn parse_partition_by_exprs(
331    expr: String,
332) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
333    // captures column, transform(column), transform(n,column), transform(n, column)
334    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
335    if !re.is_match(&expr) {
336        bail!(format!(
337            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
338            expr
339        ))
340    }
341    let caps = re.captures_iter(&expr);
342
343    let mut partition_columns = vec![];
344
345    for mat in caps {
346        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
347            (&mat["transform"], Transform::Identity)
348        } else {
349            let mut func = mat["transform"].to_owned();
350            if func == "bucket" || func == "truncate" {
351                let n = &mat
352                    .name("n")
353                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
354                    .as_str();
355                func = format!("{func}[{n}]");
356            }
357            (
358                &mat["field"],
359                Transform::from_str(&func)
360                    .with_context(|| format!("invalid transform function {}", func))?,
361            )
362        };
363        partition_columns.push((column.to_owned(), transform));
364    }
365    Ok(partition_columns)
366}