risingwave_connector/sink/iceberg/
create_table.rs1use std::collections::HashMap;
16use std::str::FromStr;
17
18use anyhow::{Context, anyhow};
19use iceberg::arrow::schema_to_arrow_schema;
20use iceberg::spec::{TableProperties, Transform, UnboundPartitionField, UnboundPartitionSpec};
21use iceberg::table::Table;
22use iceberg::{NamespaceIdent, TableCreation};
23use regex::Regex;
24use risingwave_common::array::arrow::arrow_schema_iceberg::{
25 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
26 Schema as ArrowSchema,
27};
28use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
29use risingwave_common::bail;
30use risingwave_common::catalog::Schema;
31use url::Url;
32
33use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
34use crate::sink::{Result, SinkParam};
35
36pub(super) async fn create_and_validate_table_impl(
37 config: &IcebergConfig,
38 param: &SinkParam,
39) -> Result<Table> {
40 if config.create_table_if_not_exists {
41 create_table_if_not_exists_impl(config, param).await?;
42 }
43
44 let table = config
45 .load_table()
46 .await
47 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
48
49 let sink_schema = param.schema();
50 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
51 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
52
53 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
54 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
55
56 Ok(table)
57}
58
59pub(super) async fn create_table_if_not_exists_impl(
60 config: &IcebergConfig,
61 param: &SinkParam,
62) -> Result<()> {
63 let catalog = config.create_catalog().await?;
64 let namespace = if let Some(database_name) = config.table.database_name() {
65 let namespace = NamespaceIdent::new(database_name.to_owned());
66 if !catalog
67 .namespace_exists(&namespace)
68 .await
69 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
70 {
71 catalog
72 .create_namespace(&namespace, HashMap::default())
73 .await
74 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
75 .context("failed to create iceberg namespace")?;
76 }
77 namespace
78 } else {
79 bail!("database name must be set if you want to create table")
80 };
81
82 let table_id = config
83 .full_table_name()
84 .context("Unable to parse table name")?;
85 if !catalog
86 .table_exists(&table_id)
87 .await
88 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
89 {
90 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
91 let arrow_fields = param
93 .columns
94 .iter()
95 .map(|column| {
96 Ok(iceberg_create_table_arrow_convert
97 .to_arrow_field(&column.name, &column.data_type)
98 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
99 .context(format!(
100 "failed to convert {}: {} to arrow type",
101 &column.name, &column.data_type
102 ))?)
103 })
104 .collect::<Result<Vec<ArrowField>>>()?;
105 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
106 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
107 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
108 .context("failed to convert arrow schema to iceberg schema")?;
109
110 let location = {
111 let mut names = namespace.clone().inner();
112 names.push(config.table.table_name().to_owned());
113 match &config.common.warehouse_path {
114 Some(warehouse_path) => {
115 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
116 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
118 let url = Url::parse(warehouse_path);
119 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
120 if config.common.catalog_type() == "rest"
123 || config.common.catalog_type() == "rest_rust"
124 {
125 None
126 } else {
127 bail!(format!("Invalid warehouse path: {}", warehouse_path))
128 }
129 } else if warehouse_path.ends_with('/') {
130 Some(format!("{}{}", warehouse_path, names.join("/")))
131 } else {
132 Some(format!("{}/{}", warehouse_path, names.join("/")))
133 }
134 }
135 None => None,
136 }
137 };
138
139 let partition_spec = match &config.partition_by {
140 Some(partition_by) => {
141 let mut partition_fields = Vec::<UnboundPartitionField>::new();
142 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
143 .into_iter()
144 .enumerate()
145 {
146 match iceberg_schema.field_id_by_name(&column) {
147 Some(id) => partition_fields.push(
148 UnboundPartitionField::builder()
149 .source_id(id)
150 .transform(transform)
151 .name(format!("_p_{}", column))
152 .field_id(PARTITION_DATA_ID_START + i as i32)
153 .build(),
154 ),
155 None => bail!(format!(
156 "Partition source column does not exist in schema: {}",
157 column
158 )),
159 };
160 }
161 Some(
162 UnboundPartitionSpec::builder()
163 .with_spec_id(0)
164 .add_partition_fields(partition_fields)
165 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
166 .context("failed to add partition columns")?
167 .build(),
168 )
169 }
170 None => None,
171 };
172
173 let properties = HashMap::from([(
175 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
176 (config.format_version as u8).to_string(),
177 )]);
178
179 let table_creation_builder = TableCreation::builder()
180 .name(config.table.table_name().to_owned())
181 .schema(iceberg_schema)
182 .format_version(config.table_format_version())
183 .properties(properties);
184
185 let table_creation = match (location, partition_spec) {
186 (Some(location), Some(partition_spec)) => table_creation_builder
187 .location(location)
188 .partition_spec(partition_spec)
189 .build(),
190 (Some(location), None) => table_creation_builder.location(location).build(),
191 (None, Some(partition_spec)) => table_creation_builder
192 .partition_spec(partition_spec)
193 .build(),
194 (None, None) => table_creation_builder.build(),
195 };
196
197 catalog
198 .create_table(&namespace, table_creation)
199 .await
200 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
201 .context("failed to create iceberg table")?;
202 }
203 Ok(())
204}
205
206const MAP_KEY: &str = "key";
207const MAP_VALUE: &str = "value";
208
209fn get_fields<'a>(
210 our_field_type: &'a risingwave_common::types::DataType,
211 data_type: &ArrowDataType,
212 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
213) -> Option<ArrowFields> {
214 match data_type {
215 ArrowDataType::Struct(fields) => {
216 match our_field_type {
217 risingwave_common::types::DataType::Struct(struct_fields) => {
218 struct_fields.iter().for_each(|(name, data_type)| {
219 let res = schema_fields.insert(name, data_type);
220 assert!(res.is_none())
222 });
223 }
224 risingwave_common::types::DataType::Map(map_fields) => {
225 schema_fields.insert(MAP_KEY, map_fields.key());
226 schema_fields.insert(MAP_VALUE, map_fields.value());
227 }
228 risingwave_common::types::DataType::List(list) => {
229 list.elem()
230 .as_struct()
231 .iter()
232 .for_each(|(name, data_type)| {
233 let res = schema_fields.insert(name, data_type);
234 assert!(res.is_none())
236 });
237 }
238 _ => {}
239 };
240 Some(fields.clone())
241 }
242 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
243 get_fields(our_field_type, field.data_type(), schema_fields)
244 }
245 _ => None, }
247}
248
249fn check_compatibility(
250 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
251 fields: &ArrowFields,
252) -> anyhow::Result<bool> {
253 for arrow_field in fields {
254 let our_field_type = schema_fields
255 .get(arrow_field.name().as_str())
256 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
257
258 let converted_arrow_data_type = IcebergArrowConvert
260 .to_arrow_field("", our_field_type)
261 .map_err(|e| anyhow!(e))?
262 .data_type()
263 .clone();
264
265 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
266 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
267 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
268 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
269 (ArrowDataType::List(_), ArrowDataType::List(field))
270 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
271 let mut schema_fields = HashMap::new();
272 get_fields(our_field_type, field.data_type(), &mut schema_fields)
273 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
274 }
275 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
277 let mut schema_fields = HashMap::new();
278 our_field_type
279 .as_struct()
280 .iter()
281 .for_each(|(name, data_type)| {
282 let res = schema_fields.insert(name, data_type);
283 assert!(res.is_none())
285 });
286 check_compatibility(schema_fields, fields)?
287 }
288 (left, right) => left.equals_datatype(right),
296 };
297 if !compatible {
298 bail!(
299 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
300 arrow_field.name(),
301 converted_arrow_data_type,
302 arrow_field.data_type()
303 );
304 }
305 }
306 Ok(true)
307}
308
309pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
311 if rw_schema.fields.len() != arrow_schema.fields().len() {
312 bail!(
313 "Schema length mismatch, risingwave is {}, and iceberg is {}",
314 rw_schema.fields.len(),
315 arrow_schema.fields.len()
316 );
317 }
318
319 let mut schema_fields = HashMap::new();
320 rw_schema.fields.iter().for_each(|field| {
321 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
322 assert!(res.is_none())
324 });
325
326 check_compatibility(schema_fields, &arrow_schema.fields)?;
327 Ok(())
328}
329
330pub fn parse_partition_by_exprs(
331 expr: String,
332) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
333 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
335 if !re.is_match(&expr) {
336 bail!(format!(
337 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
338 expr
339 ))
340 }
341 let caps = re.captures_iter(&expr);
342
343 let mut partition_columns = vec![];
344
345 for mat in caps {
346 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
347 (&mat["transform"], Transform::Identity)
348 } else {
349 let mut func = mat["transform"].to_owned();
350 if func == "bucket" || func == "truncate" {
351 let n = &mat
352 .name("n")
353 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
354 .as_str();
355 func = format!("{func}[{n}]");
356 }
357 (
358 &mat["field"],
359 Transform::from_str(&func)
360 .with_context(|| format!("invalid transform function {}", func))?,
361 )
362 };
363 partition_columns.push((column.to_owned(), transform));
364 }
365 Ok(partition_columns)
366}