risingwave_frontend/handler/create_source/external_schema/
iceberg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17/// TODO: make hidden columns additional columns, instead of normal columns?
18pub async fn extract_iceberg_columns(
19    with_properties: &WithOptionsSecResolved,
20) -> anyhow::Result<Vec<ColumnCatalog>> {
21    let props = ConnectorProperties::extract(with_properties.clone(), true)?;
22    if let ConnectorProperties::Iceberg(properties) = props {
23        let table = properties.load_table().await?;
24        let iceberg_schema: arrow_schema_iceberg::Schema =
25            ::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
26
27        let mut columns: Vec<ColumnCatalog> = iceberg_schema
28            .fields()
29            .iter()
30            .enumerate()
31            .map(|(i, field)| {
32                let column_desc = ColumnDesc::named(
33                    field.name(),
34                    ColumnId::new((i + 1).try_into().unwrap()),
35                    IcebergArrowConvert.type_from_field(field).unwrap(),
36                );
37                ColumnCatalog {
38                    column_desc,
39                    // hide the _row_id column for iceberg engine table
40                    // This column is auto generated when users define a table without primary key
41                    is_hidden: field.name() == ROW_ID_COLUMN_NAME,
42                }
43            })
44            .collect();
45        columns.extend(ColumnCatalog::iceberg_hidden_cols());
46
47        Ok(columns)
48    } else {
49        Err(anyhow!(format!(
50            "Invalid properties for iceberg source: {:?}",
51            props
52        )))
53    }
54}