risingwave_frontend/handler/create_source/external_schema/iceberg.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17/// TODO: make hidden columns additional columns, instead of normal columns?
18pub async fn extract_iceberg_columns(
19 with_properties: &WithOptionsSecResolved,
20) -> anyhow::Result<Vec<ColumnCatalog>> {
21 let props = ConnectorProperties::extract(with_properties.clone(), true)?;
22 if let ConnectorProperties::Iceberg(properties) = props {
23 let table = properties.load_table().await?;
24 let iceberg_schema: arrow_schema_iceberg::Schema =
25 ::iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;
26
27 let mut columns: Vec<ColumnCatalog> = iceberg_schema
28 .fields()
29 .iter()
30 .enumerate()
31 .map(|(i, field)| {
32 let column_desc = ColumnDesc::named(
33 field.name(),
34 ColumnId::new((i + 1).try_into().unwrap()),
35 IcebergArrowConvert.type_from_field(field).unwrap(),
36 );
37 ColumnCatalog {
38 column_desc,
39 // hide the _row_id column for iceberg engine table
40 // This column is auto generated when users define a table without primary key
41 is_hidden: field.name() == ROW_ID_COLUMN_NAME,
42 }
43 })
44 .collect();
45 columns.extend(ColumnCatalog::iceberg_hidden_cols());
46
47 Ok(columns)
48 } else {
49 Err(anyhow!(format!(
50 "Invalid properties for iceberg source: {:?}",
51 props
52 )))
53 }
54}