risingwave_connector/sink/iceberg/
create_table.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{
22    NullOrder, SortDirection, SortField, SortOrder, TableProperties, Transform,
23    UnboundPartitionField, UnboundPartitionSpec,
24};
25use iceberg::table::Table;
26use iceberg::{Catalog, NamespaceIdent, TableCreation};
27use itertools::Itertools;
28use regex::Regex;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{
30    self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
31    Schema as ArrowSchema,
32};
33use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
34use risingwave_common::bail;
35use risingwave_common::catalog::Schema;
36use risingwave_common::util::iter_util::ZipEqFast;
37use url::Url;
38
39use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
40use crate::sink::{Result, SinkParam};
41
42static ORDER_KEY_COLUMN_RE: LazyLock<Regex> =
43    LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").expect("valid order key regex"));
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct IcebergOrderKeyField {
47    pub column: String,
48    pub direction: SortDirection,
49    pub null_order: NullOrder,
50}
51
52impl IcebergOrderKeyField {
53    fn default_null_order(direction: SortDirection) -> NullOrder {
54        match direction {
55            SortDirection::Ascending => NullOrder::First,
56            SortDirection::Descending => NullOrder::Last,
57        }
58    }
59}
60
61pub async fn create_and_validate_table_impl(
62    config: &IcebergConfig,
63    param: &SinkParam,
64) -> Result<Table> {
65    if config.create_table_if_not_exists {
66        create_table_if_not_exists_impl(config, param).await?;
67    }
68
69    let table = config
70        .load_table()
71        .await
72        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
73
74    let sink_schema = param.schema();
75    let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
76        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
77
78    try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
79        .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
80
81    Ok(table)
82}
83
84pub(super) async fn create_table_if_not_exists_impl(
85    config: &IcebergConfig,
86    param: &SinkParam,
87) -> Result<()> {
88    let catalog = config.create_catalog().await?;
89    let table_id = config
90        .full_table_name()
91        .context("Unable to parse table name")?;
92    let namespace = table_id.namespace().clone();
93    let table_name = table_id.name().to_owned();
94    create_namespace_if_not_exists(catalog.as_ref(), &namespace).await?;
95
96    if !catalog
97        .table_exists(&table_id)
98        .await
99        .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
100    {
101        let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
102        // convert risingwave schema -> arrow schema -> iceberg schema
103        let arrow_fields = param
104            .columns
105            .iter()
106            .map(|column| {
107                Ok(iceberg_create_table_arrow_convert
108                    .to_arrow_field(&column.name, &column.data_type)
109                    .map_err(|e| SinkError::Iceberg(anyhow!(e)))
110                    .context(format!(
111                        "failed to convert {}: {} to arrow type",
112                        &column.name, &column.data_type
113                    ))?)
114            })
115            .collect::<Result<Vec<ArrowField>>>()?;
116        let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
117        let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
118            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
119            .context("failed to convert arrow schema to iceberg schema")?;
120
121        let location = {
122            let mut names = namespace.clone().inner();
123            names.push(table_name.clone());
124            match &config.common.warehouse_path {
125                Some(warehouse_path) => {
126                    let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
127                    // Lakehouse Iceberg REST catalog federation uses bq:// prefix for BigQuery-managed Iceberg tables.
128                    let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
129                    let url = Url::parse(warehouse_path);
130                    if url.is_err() || is_s3_tables || is_bq_catalog_federation {
131                        // For rest catalog, the warehouse_path could be a warehouse name.
132                        // In this case, we should specify the location when creating a table.
133                        if config.common.catalog_type() == "rest"
134                            || config.common.catalog_type() == "rest_rust"
135                        {
136                            None
137                        } else {
138                            bail!(format!("Invalid warehouse path: {}", warehouse_path))
139                        }
140                    } else if warehouse_path.ends_with('/') {
141                        Some(format!("{}{}", warehouse_path, names.join("/")))
142                    } else {
143                        Some(format!("{}/{}", warehouse_path, names.join("/")))
144                    }
145                }
146                None => None,
147            }
148        };
149
150        let partition_spec = match &config.partition_by {
151            Some(partition_by) => {
152                let mut partition_fields = Vec::<UnboundPartitionField>::new();
153                for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
154                    .into_iter()
155                    .enumerate()
156                {
157                    match iceberg_schema.field_id_by_name(&column) {
158                        Some(id) => partition_fields.push(
159                            UnboundPartitionField::builder()
160                                .source_id(id)
161                                .transform(transform)
162                                .name(format!("_p_{}", column))
163                                .field_id(PARTITION_DATA_ID_START + i as i32)
164                                .build(),
165                        ),
166                        None => bail!(format!(
167                            "Partition source column does not exist in schema: {}",
168                            column
169                        )),
170                    };
171                }
172                Some(
173                    UnboundPartitionSpec::builder()
174                        .with_spec_id(0)
175                        .add_partition_fields(partition_fields)
176                        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
177                        .context("failed to add partition columns")?
178                        .build(),
179                )
180            }
181            None => None,
182        };
183
184        let sort_order = match &config.order_key {
185            Some(order_key) => Some(build_sort_order(order_key, &iceberg_schema)?),
186            None => None,
187        };
188
189        // Put format-version into table properties, because catalog like jdbc extract format-version from table properties.
190        let properties = HashMap::from([(
191            TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
192            (config.format_version as u8).to_string(),
193        )]);
194
195        let table_creation_builder = TableCreation::builder()
196            .name(table_name)
197            .schema(iceberg_schema)
198            .format_version(config.table_format_version())
199            .properties(properties);
200
201        let table_creation = match (location, partition_spec, sort_order) {
202            (Some(location), Some(partition_spec), Some(sort_order)) => table_creation_builder
203                .location(location)
204                .partition_spec(partition_spec)
205                .sort_order(sort_order)
206                .build(),
207            (Some(location), Some(partition_spec), None) => table_creation_builder
208                .location(location)
209                .partition_spec(partition_spec)
210                .build(),
211            (Some(location), None, Some(sort_order)) => table_creation_builder
212                .location(location)
213                .sort_order(sort_order)
214                .build(),
215            (Some(location), None, None) => table_creation_builder.location(location).build(),
216            (None, Some(partition_spec), Some(sort_order)) => table_creation_builder
217                .partition_spec(partition_spec)
218                .sort_order(sort_order)
219                .build(),
220            (None, Some(partition_spec), None) => table_creation_builder
221                .partition_spec(partition_spec)
222                .build(),
223            (None, None, Some(sort_order)) => table_creation_builder.sort_order(sort_order).build(),
224            (None, None, None) => table_creation_builder.build(),
225        };
226
227        catalog
228            .create_table(&namespace, table_creation)
229            .await
230            .map_err(|e| SinkError::Iceberg(anyhow!(e)))
231            .context("failed to create iceberg table")?;
232    }
233    Ok(())
234}
235
236async fn create_namespace_if_not_exists(
237    catalog: &dyn Catalog,
238    namespace: &NamespaceIdent,
239) -> Result<()> {
240    let mut namespaces = vec![namespace.clone()];
241    let mut parent = namespace.parent();
242    while let Some(parent_namespace) = parent {
243        parent = parent_namespace.parent();
244        namespaces.push(parent_namespace);
245    }
246
247    for namespace in namespaces.into_iter().rev() {
248        if !catalog
249            .namespace_exists(&namespace)
250            .await
251            .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
252        {
253            catalog
254                .create_namespace(&namespace, HashMap::default())
255                .await
256                .map_err(|e| SinkError::Iceberg(anyhow!(e)))
257                .with_context(|| format!("failed to create iceberg namespace: {namespace}"))?;
258        }
259    }
260
261    Ok(())
262}
263
264const MAP_KEY: &str = "key";
265const MAP_VALUE: &str = "value";
266
267fn get_fields<'a>(
268    our_field_type: &'a risingwave_common::types::DataType,
269    data_type: &ArrowDataType,
270    schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
271) -> Option<ArrowFields> {
272    match data_type {
273        ArrowDataType::Struct(fields) => {
274            match our_field_type {
275                risingwave_common::types::DataType::Struct(struct_fields) => {
276                    struct_fields.iter().for_each(|(name, data_type)| {
277                        let res = schema_fields.insert(name, data_type);
278                        // This assert is to make sure there is no duplicate field name in the schema.
279                        assert!(res.is_none())
280                    });
281                }
282                risingwave_common::types::DataType::Map(map_fields) => {
283                    schema_fields.insert(MAP_KEY, map_fields.key());
284                    schema_fields.insert(MAP_VALUE, map_fields.value());
285                }
286                risingwave_common::types::DataType::List(list) => {
287                    list.elem()
288                        .as_struct()
289                        .iter()
290                        .for_each(|(name, data_type)| {
291                            let res = schema_fields.insert(name, data_type);
292                            // This assert is to make sure there is no duplicate field name in the schema.
293                            assert!(res.is_none())
294                        });
295                }
296                _ => {}
297            };
298            Some(fields.clone())
299        }
300        ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
301            get_fields(our_field_type, field.data_type(), schema_fields)
302        }
303        _ => None, // not a supported complex type and unlikely to show up
304    }
305}
306
307fn check_compatibility(
308    schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
309    fields: &ArrowFields,
310) -> anyhow::Result<bool> {
311    for arrow_field in fields {
312        let our_field_type = schema_fields
313            .get(arrow_field.name().as_str())
314            .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
315
316        // Iceberg source should be able to read iceberg decimal type.
317        let converted_arrow_data_type = IcebergArrowConvert
318            .to_arrow_field("", our_field_type)
319            .map_err(|e| anyhow!(e))?
320            .data_type()
321            .clone();
322
323        let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
324            (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
325            (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
326            (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
327            (ArrowDataType::List(_), ArrowDataType::List(field))
328            | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
329                let mut schema_fields = HashMap::new();
330                get_fields(our_field_type, field.data_type(), &mut schema_fields)
331                    .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
332            }
333            // validate nested structs
334            (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
335                let mut schema_fields = HashMap::new();
336                our_field_type
337                    .as_struct()
338                    .iter()
339                    .for_each(|(name, data_type)| {
340                        let res = schema_fields.insert(name, data_type);
341                        // This assert is to make sure there is no duplicate field name in the schema.
342                        assert!(res.is_none())
343                    });
344                check_compatibility(schema_fields, fields)?
345            }
346            // cases where left != right (metadata, field name mismatch)
347            //
348            // all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
349            // {"PARQUET:field_id": ".."}
350            //
351            // map: The standard name in arrow is "entries", "key", "value".
352            // in iceberg-rs, it's called "key_value"
353            (left, right) => left.equals_datatype(right),
354        };
355        if !compatible {
356            bail!(
357                "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
358                arrow_field.name(),
359                converted_arrow_data_type,
360                arrow_field.data_type()
361            );
362        }
363    }
364    Ok(true)
365}
366
367/// Try to match our schema with iceberg schema.
368pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
369    if rw_schema.fields.len() != arrow_schema.fields().len() {
370        bail!(
371            "Schema length mismatch, risingwave is {}, and iceberg is {}",
372            rw_schema.fields.len(),
373            arrow_schema.fields.len()
374        );
375    }
376
377    let mut schema_fields = HashMap::new();
378    rw_schema.fields.iter().for_each(|field| {
379        let res = schema_fields.insert(field.name.as_str(), &field.data_type);
380        // This assert is to make sure there is no duplicate field name in the schema.
381        assert!(res.is_none())
382    });
383
384    check_compatibility(schema_fields, &arrow_schema.fields)?;
385
386    // The sink writes columns to the Iceberg table by position, so the column order
387    // must match. The check above only validates the name set and types.
388    for (idx, (rw_field, arrow_field)) in rw_schema
389        .fields
390        .iter()
391        .zip_eq_fast(arrow_schema.fields().iter())
392        .enumerate()
393    {
394        if rw_field.name.as_str() != arrow_field.name().as_str() {
395            bail!(
396                "Column order mismatch at position {}: the sink has column `{}` but the \
397                 Iceberg table has column `{}`. The Iceberg sink maps columns to the table \
398                 by position, so the sink's column order must match the Iceberg table \
399                 columns [{}].",
400                idx,
401                rw_field.name,
402                arrow_field.name(),
403                arrow_schema.fields().iter().map(|f| f.name()).join(", "),
404            );
405        }
406    }
407
408    Ok(())
409}
410
411pub fn parse_partition_by_exprs(
412    expr: String,
413) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
414    // captures column, transform(column), transform(n,column), transform(n, column)
415    let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
416    if !re.is_match(&expr) {
417        bail!(format!(
418            "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
419            expr
420        ))
421    }
422    let caps = re.captures_iter(&expr);
423
424    let mut partition_columns = vec![];
425
426    for mat in caps {
427        let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
428            (&mat["transform"], Transform::Identity)
429        } else {
430            let mut func = mat["transform"].to_owned();
431            if func == "bucket" || func == "truncate" {
432                let n = &mat
433                    .name("n")
434                    .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
435                    .as_str();
436                func = format!("{func}[{n}]");
437            }
438            (
439                &mat["field"],
440                Transform::from_str(&func)
441                    .with_context(|| format!("invalid transform function {}", func))?,
442            )
443        };
444        partition_columns.push((column.to_owned(), transform));
445    }
446    Ok(partition_columns)
447}
448
449pub fn parse_order_key_exprs(
450    expr: String,
451) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
452    let mut order_keys = Vec::new();
453    let mut seen_columns = std::collections::HashSet::new();
454
455    for raw_item in expr.split(',') {
456        let item = raw_item.trim();
457        if item.is_empty() {
458            bail!("Invalid order key: empty item in `{expr}`");
459        }
460
461        let tokens = item.split_whitespace().collect_vec();
462        if tokens.is_empty() {
463            bail!("Invalid order key item `{item}`");
464        }
465        if tokens.len() > 4 {
466            bail!(
467                "Invalid order key item `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
468            );
469        }
470
471        let column = tokens[0];
472        if !ORDER_KEY_COLUMN_RE.is_match(column) {
473            bail!(
474                "Invalid order key column `{column}`\nHINT: Only plain column names are supported in order_key"
475            );
476        }
477        if !seen_columns.insert(column.to_ascii_lowercase()) {
478            bail!("Duplicate column `{column}` in order_key");
479        }
480
481        let mut direction = SortDirection::Ascending;
482        let mut null_order = None;
483        let mut idx = 1;
484        while idx < tokens.len() {
485            match tokens[idx].to_ascii_lowercase().as_str() {
486                "asc" => {
487                    direction = SortDirection::Ascending;
488                    idx += 1;
489                }
490                "desc" => {
491                    direction = SortDirection::Descending;
492                    idx += 1;
493                }
494                "nulls" => {
495                    let order = tokens.get(idx + 1).ok_or_else(|| {
496                        anyhow!(
497                            "Invalid order key item `{item}`: `NULLS` must be followed by `FIRST` or `LAST`"
498                        )
499                    })?;
500                    null_order = Some(match order.to_ascii_lowercase().as_str() {
501                        "first" => NullOrder::First,
502                        "last" => NullOrder::Last,
503                        _ => bail!(
504                            "Invalid order key item `{item}`\nHINT: `NULLS` must be followed by `FIRST` or `LAST`"
505                        ),
506                    });
507                    idx += 2;
508                }
509                token => {
510                    bail!(
511                        "Invalid order key token `{token}` in `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
512                    );
513                }
514            }
515        }
516
517        order_keys.push(IcebergOrderKeyField {
518            column: column.to_owned(),
519            direction,
520            null_order: null_order
521                .unwrap_or_else(|| IcebergOrderKeyField::default_null_order(direction)),
522        });
523    }
524
525    if order_keys.is_empty() {
526        bail!("order_key must not be empty");
527    }
528
529    Ok(order_keys)
530}
531
532pub fn validate_order_key_columns<'a>(
533    order_key: &str,
534    columns: impl IntoIterator<Item = &'a str>,
535) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
536    let parsed = parse_order_key_exprs(order_key.to_owned())?;
537    let columns = columns
538        .into_iter()
539        .map(|column| column.to_ascii_lowercase())
540        .collect::<std::collections::HashSet<_>>();
541    for item in &parsed {
542        if item.column.starts_with('_') {
543            bail!(
544                "System column `{}` is not allowed in order_key",
545                item.column
546            );
547        }
548        if !columns.contains(&item.column.to_ascii_lowercase()) {
549            bail!("Order key column does not exist in schema: {}", item.column);
550        }
551    }
552    Ok(parsed)
553}
554
555fn build_sort_order(order_key: &str, schema: &iceberg::spec::Schema) -> Result<SortOrder> {
556    let order_fields = validate_order_key_columns(
557        order_key,
558        schema
559            .as_struct()
560            .fields()
561            .iter()
562            .map(|field| field.name.as_str()),
563    )?;
564    let mut builder = SortOrder::builder();
565    for field in order_fields {
566        let source_id = schema.field_id_by_name(&field.column).ok_or_else(|| {
567            anyhow!(
568                "Order key column does not exist in schema: {}",
569                field.column
570            )
571        })?;
572        builder.with_sort_field(
573            SortField::builder()
574                .source_id(source_id)
575                .transform(Transform::Identity)
576                .direction(field.direction)
577                .null_order(field.null_order)
578                .build(),
579        );
580    }
581    builder
582        .build(schema)
583        .map_err(|e| SinkError::Iceberg(anyhow!(e)))
584}