risingwave_connector/sink/iceberg/
create_table.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{
22    NullOrder, SortDirection, SortField, SortOrder, TableProperties, Transform,
23    UnboundPartitionField, UnboundPartitionSpec,
24};
25use iceberg::table::Table;
26use iceberg::{Catalog, NamespaceIdent, TableCreation};
27use itertools::Itertools;
28use regex::Regex;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{
30    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
31    Schema as ArrowSchema,
32};
33use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
34use risingwave_common::bail;
35use risingwave_common::catalog::Schema;
36use url::Url;
37
38use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
39use crate::sink::{Result, SinkParam};
40
41static ORDER_KEY_COLUMN_RE: LazyLock<Regex> =
42    LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").expect("valid order key regex"));
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct IcebergOrderKeyField {
46    pub column: String,
47    pub direction: SortDirection,
48    pub null_order: NullOrder,
49}
50
51impl IcebergOrderKeyField {
52    fn default_null_order(direction: SortDirection) -> NullOrder {
53        match direction {
54            SortDirection::Ascending => NullOrder::First,
55            SortDirection::Descending => NullOrder::Last,
56        }
57    }
58}
59
60pub(super) async fn create_and_validate_table_impl(
61    config: &IcebergConfig,
62    param: &SinkParam,
63) -> Result<Table> {
64    if config.create_table_if_not_exists {
65        create_table_if_not_exists_impl(config, param).await?;
66    }
67
68    let table = config
69        .load_table()
70        .await
71        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
72
73    let sink_schema = param.schema();
74    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
75        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
76
77    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
78        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
79
80    Ok(table)
81}
82
83pub(super) async fn create_table_if_not_exists_impl(
84    config: &IcebergConfig,
85    param: &SinkParam,
86) -> Result<()> {
87    let catalog = config.create_catalog().await?;
88    let table_id = config
89        .full_table_name()
90        .context("Unable to parse table name")?;
91    let namespace = table_id.namespace().clone();
92    let table_name = table_id.name().to_owned();
93    create_namespace_if_not_exists(catalog.as_ref(), &namespace).await?;
94
95    if !catalog
96        .table_exists(&table_id)
97        .await
98        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
99    {
100        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
101        // convert risingwave schema -> arrow schema -> iceberg schema
102        let arrow_fields = param
103            .columns
104            .iter()
105            .map(|column| {
106                Ok(iceberg_create_table_arrow_convert
107                    .to_arrow_field(&column.name, &column.data_type)
108                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
109                    .context(format!(
110                        "failed to convert {}: {} to arrow type",
111                        &column.name, &column.data_type
112                    ))?)
113            })
114            .collect::<Result<Vec<ArrowField>>>()?;
115        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
116        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
117            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
118            .context("failed to convert arrow schema to iceberg schema")?;
119
120        let location = {
121            let mut names = namespace.clone().inner();
122            names.push(table_name.clone());
123            match &config.common.warehouse_path {
124                Some(warehouse_path) => {
125                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
126                    // BigLake catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables
127                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
128                    let url = Url::parse(warehouse_path);
129                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
130                        // For rest catalog, the warehouse_path could be a warehouse name.
131                        // In this case, we should specify the location when creating a table.
132                        if config.common.catalog_type() == "rest"
133                            || config.common.catalog_type() == "rest_rust"
134                        {
135                            None
136                        } else {
137                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
138                        }
139                    } else if warehouse_path.ends_with('/') {
140                        Some(format!("{}{}", warehouse_path, names.join("/")))
141                    } else {
142                        Some(format!("{}/{}", warehouse_path, names.join("/")))
143                    }
144                }
145                None => None,
146            }
147        };
148
149        let partition_spec = match &config.partition_by {
150            Some(partition_by) => {
151                let mut partition_fields = Vec::<UnboundPartitionField>::new();
152                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
153                    .into_iter()
154                    .enumerate()
155                {
156                    match iceberg_schema.field_id_by_name(&column) {
157                        Some(id) => partition_fields.push(
158                            UnboundPartitionField::builder()
159                                .source_id(id)
160                                .transform(transform)
161                                .name(format!("_p_{}", column))
162                                .field_id(PARTITION_DATA_ID_START + i as i32)
163                                .build(),
164                        ),
165                        None => bail!(format!(
166                            "Partition source column does not exist in schema: {}",
167                            column
168                        )),
169                    };
170                }
171                Some(
172                    UnboundPartitionSpec::builder()
173                        .with_spec_id(0)
174                        .add_partition_fields(partition_fields)
175                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
176                        .context("failed to add partition columns")?
177                        .build(),
178                )
179            }
180            None => None,
181        };
182
183        let sort_order = match &config.order_key {
184            Some(order_key) => Some(build_sort_order(order_key, &iceberg_schema)?),
185            None => None,
186        };
187
188        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
189        let properties = HashMap::from([(
190            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
191            (config.format_version as u8).to_string(),
192        )]);
193
194        let table_creation_builder = TableCreation::builder()
195            .name(table_name)
196            .schema(iceberg_schema)
197            .format_version(config.table_format_version())
198            .properties(properties);
199
200        let table_creation = match (location, partition_spec, sort_order) {
201            (Some(location), Some(partition_spec), Some(sort_order)) => table_creation_builder
202                .location(location)
203                .partition_spec(partition_spec)
204                .sort_order(sort_order)
205                .build(),
206            (Some(location), Some(partition_spec), None) => table_creation_builder
207                .location(location)
208                .partition_spec(partition_spec)
209                .build(),
210            (Some(location), None, Some(sort_order)) => table_creation_builder
211                .location(location)
212                .sort_order(sort_order)
213                .build(),
214            (Some(location), None, None) => table_creation_builder.location(location).build(),
215            (None, Some(partition_spec), Some(sort_order)) => table_creation_builder
216                .partition_spec(partition_spec)
217                .sort_order(sort_order)
218                .build(),
219            (None, Some(partition_spec), None) => table_creation_builder
220                .partition_spec(partition_spec)
221                .build(),
222            (None, None, Some(sort_order)) => table_creation_builder.sort_order(sort_order).build(),
223            (None, None, None) => table_creation_builder.build(),
224        };
225
226        catalog
227            .create_table(&namespace, table_creation)
228            .await
229            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
230            .context("failed to create iceberg table")?;
231    }
232    Ok(())
233}
234
235async fn create_namespace_if_not_exists(
236    catalog: &dyn Catalog,
237    namespace: &NamespaceIdent,
238) -> Result<()> {
239    let mut namespaces = vec![namespace.clone()];
240    let mut parent = namespace.parent();
241    while let Some(parent_namespace) = parent {
242        parent = parent_namespace.parent();
243        namespaces.push(parent_namespace);
244    }
245
246    for namespace in namespaces.into_iter().rev() {
247        if !catalog
248            .namespace_exists(&namespace)
249            .await
250            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
251        {
252            catalog
253                .create_namespace(&namespace, HashMap::default())
254                .await
255                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
256                .with_context(|| format!("failed to create iceberg namespace: {namespace}"))?;
257        }
258    }
259
260    Ok(())
261}
262
263const MAP_KEY: &str = "key";
264const MAP_VALUE: &str = "value";
265
266fn get_fields<'a>(
267    our_field_type: &'a risingwave_common::types::DataType,
268    data_type: &ArrowDataType,
269    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
270) -> Option<ArrowFields> {
271    match data_type {
272        ArrowDataType::Struct(fields) => {
273            match our_field_type {
274                risingwave_common::types::DataType::Struct(struct_fields) => {
275                    struct_fields.iter().for_each(|(name, data_type)| {
276                        let res = schema_fields.insert(name, data_type);
277                        // This assert is to make sure there is no duplicate field name in the schema.
278                        assert!(res.is_none())
279                    });
280                }
281                risingwave_common::types::DataType::Map(map_fields) => {
282                    schema_fields.insert(MAP_KEY, map_fields.key());
283                    schema_fields.insert(MAP_VALUE, map_fields.value());
284                }
285                risingwave_common::types::DataType::List(list) => {
286                    list.elem()
287                        .as_struct()
288                        .iter()
289                        .for_each(|(name, data_type)| {
290                            let res = schema_fields.insert(name, data_type);
291                            // This assert is to make sure there is no duplicate field name in the schema.
292                            assert!(res.is_none())
293                        });
294                }
295                _ => {}
296            };
297            Some(fields.clone())
298        }
299        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
300            get_fields(our_field_type, field.data_type(), schema_fields)
301        }
302        _ => None, // not a supported complex type and unlikely to show up
303    }
304}
305
306fn check_compatibility(
307    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
308    fields: &ArrowFields,
309) -> anyhow::Result<bool> {
310    for arrow_field in fields {
311        let our_field_type = schema_fields
312            .get(arrow_field.name().as_str())
313            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
314
315        // Iceberg source should be able to read iceberg decimal type.
316        let converted_arrow_data_type = IcebergArrowConvert
317            .to_arrow_field("", our_field_type)
318            .map_err(|e| anyhow!(e))?
319            .data_type()
320            .clone();
321
322        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
323            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
324            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
325            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
326            (ArrowDataType::List(_), ArrowDataType::List(field))
327            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
328                let mut schema_fields = HashMap::new();
329                get_fields(our_field_type, field.data_type(), &mut schema_fields)
330                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
331            }
332            // validate nested structs
333            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
334                let mut schema_fields = HashMap::new();
335                our_field_type
336                    .as_struct()
337                    .iter()
338                    .for_each(|(name, data_type)| {
339                        let res = schema_fields.insert(name, data_type);
340                        // This assert is to make sure there is no duplicate field name in the schema.
341                        assert!(res.is_none())
342                    });
343                check_compatibility(schema_fields, fields)?
344            }
345            // cases where left != right (metadata, field name mismatch)
346            //
347            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
348            // {"PARQUET:field_id": ".."}
349            //
350            // map: The standard name in arrow is "entries", "key", "value".
351            // in iceberg-rs, it's called "key_value"
352            (left, right) => left.equals_datatype(right),
353        };
354        if !compatible {
355            bail!(
356                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
357                arrow_field.name(),
358                converted_arrow_data_type,
359                arrow_field.data_type()
360            );
361        }
362    }
363    Ok(true)
364}
365
366/// Try to match our schema with iceberg schema.
367pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
368    if rw_schema.fields.len() != arrow_schema.fields().len() {
369        bail!(
370            "Schema length mismatch, risingwave is {}, and iceberg is {}",
371            rw_schema.fields.len(),
372            arrow_schema.fields.len()
373        );
374    }
375
376    let mut schema_fields = HashMap::new();
377    rw_schema.fields.iter().for_each(|field| {
378        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
379        // This assert is to make sure there is no duplicate field name in the schema.
380        assert!(res.is_none())
381    });
382
383    check_compatibility(schema_fields, &arrow_schema.fields)?;
384    Ok(())
385}
386
387pub fn parse_partition_by_exprs(
388    expr: String,
389) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
390    // captures column, transform(column), transform(n,column), transform(n, column)
391    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
392    if !re.is_match(&expr) {
393        bail!(format!(
394            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
395            expr
396        ))
397    }
398    let caps = re.captures_iter(&expr);
399
400    let mut partition_columns = vec![];
401
402    for mat in caps {
403        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
404            (&mat["transform"], Transform::Identity)
405        } else {
406            let mut func = mat["transform"].to_owned();
407            if func == "bucket" || func == "truncate" {
408                let n = &mat
409                    .name("n")
410                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
411                    .as_str();
412                func = format!("{func}[{n}]");
413            }
414            (
415                &mat["field"],
416                Transform::from_str(&func)
417                    .with_context(|| format!("invalid transform function {}", func))?,
418            )
419        };
420        partition_columns.push((column.to_owned(), transform));
421    }
422    Ok(partition_columns)
423}
424
425pub fn parse_order_key_exprs(
426    expr: String,
427) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
428    let mut order_keys = Vec::new();
429    let mut seen_columns = std::collections::HashSet::new();
430
431    for raw_item in expr.split(',') {
432        let item = raw_item.trim();
433        if item.is_empty() {
434            bail!("Invalid order key: empty item in `{expr}`");
435        }
436
437        let tokens = item.split_whitespace().collect_vec();
438        if tokens.is_empty() {
439            bail!("Invalid order key item `{item}`");
440        }
441        if tokens.len() > 4 {
442            bail!(
443                "Invalid order key item `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
444            );
445        }
446
447        let column = tokens[0];
448        if !ORDER_KEY_COLUMN_RE.is_match(column) {
449            bail!(
450                "Invalid order key column `{column}`\nHINT: Only plain column names are supported in order_key"
451            );
452        }
453        if !seen_columns.insert(column.to_ascii_lowercase()) {
454            bail!("Duplicate column `{column}` in order_key");
455        }
456
457        let mut direction = SortDirection::Ascending;
458        let mut null_order = None;
459        let mut idx = 1;
460        while idx < tokens.len() {
461            match tokens[idx].to_ascii_lowercase().as_str() {
462                "asc" => {
463                    direction = SortDirection::Ascending;
464                    idx += 1;
465                }
466                "desc" => {
467                    direction = SortDirection::Descending;
468                    idx += 1;
469                }
470                "nulls" => {
471                    let order = tokens.get(idx + 1).ok_or_else(|| {
472                        anyhow!(
473                            "Invalid order key item `{item}`: `NULLS` must be followed by `FIRST` or `LAST`"
474                        )
475                    })?;
476                    null_order = Some(match order.to_ascii_lowercase().as_str() {
477                        "first" => NullOrder::First,
478                        "last" => NullOrder::Last,
479                        _ => bail!(
480                            "Invalid order key item `{item}`\nHINT: `NULLS` must be followed by `FIRST` or `LAST`"
481                        ),
482                    });
483                    idx += 2;
484                }
485                token => {
486                    bail!(
487                        "Invalid order key token `{token}` in `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
488                    );
489                }
490            }
491        }
492
493        order_keys.push(IcebergOrderKeyField {
494            column: column.to_owned(),
495            direction,
496            null_order: null_order
497                .unwrap_or_else(|| IcebergOrderKeyField::default_null_order(direction)),
498        });
499    }
500
501    if order_keys.is_empty() {
502        bail!("order_key must not be empty");
503    }
504
505    Ok(order_keys)
506}
507
508pub fn validate_order_key_columns<'a>(
509    order_key: &str,
510    columns: impl IntoIterator<Item = &'a str>,
511) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
512    let parsed = parse_order_key_exprs(order_key.to_owned())?;
513    let columns = columns
514        .into_iter()
515        .map(|column| column.to_ascii_lowercase())
516        .collect::<std::collections::HashSet<_>>();
517    for item in &parsed {
518        if item.column.starts_with('_') {
519            bail!(
520                "System column `{}` is not allowed in order_key",
521                item.column
522            );
523        }
524        if !columns.contains(&item.column.to_ascii_lowercase()) {
525            bail!("Order key column does not exist in schema: {}", item.column);
526        }
527    }
528    Ok(parsed)
529}
530
531fn build_sort_order(order_key: &str, schema: &iceberg::spec::Schema) -> Result<SortOrder> {
532    let order_fields = validate_order_key_columns(
533        order_key,
534        schema
535            .as_struct()
536            .fields()
537            .iter()
538            .map(|field| field.name.as_str()),
539    )?;
540    let mut builder = SortOrder::builder();
541    for field in order_fields {
542        let source_id = schema.field_id_by_name(&field.column).ok_or_else(|| {
543            anyhow!(
544                "Order key column does not exist in schema: {}",
545                field.column
546            )
547        })?;
548        builder.with_sort_field(
549            SortField::builder()
550                .source_id(source_id)
551                .transform(Transform::Identity)
552                .direction(field.direction)
553                .null_order(field.null_order)
554                .build(),
555        );
556    }
557    builder
558        .build(schema)
559        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
560}