1use std::collections::HashMap;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{
22 NullOrder, SortDirection, SortField, SortOrder, TableProperties, Transform,
23 UnboundPartitionField, UnboundPartitionSpec,
24};
25use iceberg::table::Table;
26use iceberg::{Catalog, NamespaceIdent, TableCreation};
27use itertools::Itertools;
28use regex::Regex;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{
30 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
31 Schema as ArrowSchema,
32};
33use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
34use risingwave_common::bail;
35use risingwave_common::catalog::Schema;
36use url::Url;
37
38use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
39use crate::sink::{Result, SinkParam};
40
41static ORDER_KEY_COLUMN_RE: LazyLock<Regex> =
42 LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").expect("valid order key regex"));
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct IcebergOrderKeyField {
46 pub column: String,
47 pub direction: SortDirection,
48 pub null_order: NullOrder,
49}
50
51impl IcebergOrderKeyField {
52 fn default_null_order(direction: SortDirection) -> NullOrder {
53 match direction {
54 SortDirection::Ascending => NullOrder::First,
55 SortDirection::Descending => NullOrder::Last,
56 }
57 }
58}
59
60pub(super) async fn create_and_validate_table_impl(
61 config: &IcebergConfig,
62 param: &SinkParam,
63) -> Result<Table> {
64 if config.create_table_if_not_exists {
65 create_table_if_not_exists_impl(config, param).await?;
66 }
67
68 let table = config
69 .load_table()
70 .await
71 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
72
73 let sink_schema = param.schema();
74 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
75 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
76
77 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
78 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
79
80 Ok(table)
81}
82
83pub(super) async fn create_table_if_not_exists_impl(
84 config: &IcebergConfig,
85 param: &SinkParam,
86) -> Result<()> {
87 let catalog = config.create_catalog().await?;
88 let table_id = config
89 .full_table_name()
90 .context("Unable to parse table name")?;
91 let namespace = table_id.namespace().clone();
92 let table_name = table_id.name().to_owned();
93 create_namespace_if_not_exists(catalog.as_ref(), &namespace).await?;
94
95 if !catalog
96 .table_exists(&table_id)
97 .await
98 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
99 {
100 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
101 let arrow_fields = param
103 .columns
104 .iter()
105 .map(|column| {
106 Ok(iceberg_create_table_arrow_convert
107 .to_arrow_field(&column.name, &column.data_type)
108 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
109 .context(format!(
110 "failed to convert {}: {} to arrow type",
111 &column.name, &column.data_type
112 ))?)
113 })
114 .collect::<Result<Vec<ArrowField>>>()?;
115 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
116 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
117 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
118 .context("failed to convert arrow schema to iceberg schema")?;
119
120 let location = {
121 let mut names = namespace.clone().inner();
122 names.push(table_name.clone());
123 match &config.common.warehouse_path {
124 Some(warehouse_path) => {
125 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
126 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
128 let url = Url::parse(warehouse_path);
129 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
130 if config.common.catalog_type() == "rest"
133 || config.common.catalog_type() == "rest_rust"
134 {
135 None
136 } else {
137 bail!(format!("Invalid warehouse path: {}", warehouse_path))
138 }
139 } else if warehouse_path.ends_with('/') {
140 Some(format!("{}{}", warehouse_path, names.join("/")))
141 } else {
142 Some(format!("{}/{}", warehouse_path, names.join("/")))
143 }
144 }
145 None => None,
146 }
147 };
148
149 let partition_spec = match &config.partition_by {
150 Some(partition_by) => {
151 let mut partition_fields = Vec::<UnboundPartitionField>::new();
152 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
153 .into_iter()
154 .enumerate()
155 {
156 match iceberg_schema.field_id_by_name(&column) {
157 Some(id) => partition_fields.push(
158 UnboundPartitionField::builder()
159 .source_id(id)
160 .transform(transform)
161 .name(format!("_p_{}", column))
162 .field_id(PARTITION_DATA_ID_START + i as i32)
163 .build(),
164 ),
165 None => bail!(format!(
166 "Partition source column does not exist in schema: {}",
167 column
168 )),
169 };
170 }
171 Some(
172 UnboundPartitionSpec::builder()
173 .with_spec_id(0)
174 .add_partition_fields(partition_fields)
175 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
176 .context("failed to add partition columns")?
177 .build(),
178 )
179 }
180 None => None,
181 };
182
183 let sort_order = match &config.order_key {
184 Some(order_key) => Some(build_sort_order(order_key, &iceberg_schema)?),
185 None => None,
186 };
187
188 let properties = HashMap::from([(
190 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
191 (config.format_version as u8).to_string(),
192 )]);
193
194 let table_creation_builder = TableCreation::builder()
195 .name(table_name)
196 .schema(iceberg_schema)
197 .format_version(config.table_format_version())
198 .properties(properties);
199
200 let table_creation = match (location, partition_spec, sort_order) {
201 (Some(location), Some(partition_spec), Some(sort_order)) => table_creation_builder
202 .location(location)
203 .partition_spec(partition_spec)
204 .sort_order(sort_order)
205 .build(),
206 (Some(location), Some(partition_spec), None) => table_creation_builder
207 .location(location)
208 .partition_spec(partition_spec)
209 .build(),
210 (Some(location), None, Some(sort_order)) => table_creation_builder
211 .location(location)
212 .sort_order(sort_order)
213 .build(),
214 (Some(location), None, None) => table_creation_builder.location(location).build(),
215 (None, Some(partition_spec), Some(sort_order)) => table_creation_builder
216 .partition_spec(partition_spec)
217 .sort_order(sort_order)
218 .build(),
219 (None, Some(partition_spec), None) => table_creation_builder
220 .partition_spec(partition_spec)
221 .build(),
222 (None, None, Some(sort_order)) => table_creation_builder.sort_order(sort_order).build(),
223 (None, None, None) => table_creation_builder.build(),
224 };
225
226 catalog
227 .create_table(&namespace, table_creation)
228 .await
229 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
230 .context("failed to create iceberg table")?;
231 }
232 Ok(())
233}
234
235async fn create_namespace_if_not_exists(
236 catalog: &dyn Catalog,
237 namespace: &NamespaceIdent,
238) -> Result<()> {
239 let mut namespaces = vec![namespace.clone()];
240 let mut parent = namespace.parent();
241 while let Some(parent_namespace) = parent {
242 parent = parent_namespace.parent();
243 namespaces.push(parent_namespace);
244 }
245
246 for namespace in namespaces.into_iter().rev() {
247 if !catalog
248 .namespace_exists(&namespace)
249 .await
250 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
251 {
252 catalog
253 .create_namespace(&namespace, HashMap::default())
254 .await
255 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
256 .with_context(|| format!("failed to create iceberg namespace: {namespace}"))?;
257 }
258 }
259
260 Ok(())
261}
262
263const MAP_KEY: &str = "key";
264const MAP_VALUE: &str = "value";
265
266fn get_fields<'a>(
267 our_field_type: &'a risingwave_common::types::DataType,
268 data_type: &ArrowDataType,
269 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
270) -> Option<ArrowFields> {
271 match data_type {
272 ArrowDataType::Struct(fields) => {
273 match our_field_type {
274 risingwave_common::types::DataType::Struct(struct_fields) => {
275 struct_fields.iter().for_each(|(name, data_type)| {
276 let res = schema_fields.insert(name, data_type);
277 assert!(res.is_none())
279 });
280 }
281 risingwave_common::types::DataType::Map(map_fields) => {
282 schema_fields.insert(MAP_KEY, map_fields.key());
283 schema_fields.insert(MAP_VALUE, map_fields.value());
284 }
285 risingwave_common::types::DataType::List(list) => {
286 list.elem()
287 .as_struct()
288 .iter()
289 .for_each(|(name, data_type)| {
290 let res = schema_fields.insert(name, data_type);
291 assert!(res.is_none())
293 });
294 }
295 _ => {}
296 };
297 Some(fields.clone())
298 }
299 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
300 get_fields(our_field_type, field.data_type(), schema_fields)
301 }
302 _ => None, }
304}
305
306fn check_compatibility(
307 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
308 fields: &ArrowFields,
309) -> anyhow::Result<bool> {
310 for arrow_field in fields {
311 let our_field_type = schema_fields
312 .get(arrow_field.name().as_str())
313 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
314
315 let converted_arrow_data_type = IcebergArrowConvert
317 .to_arrow_field("", our_field_type)
318 .map_err(|e| anyhow!(e))?
319 .data_type()
320 .clone();
321
322 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
323 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
324 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
325 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
326 (ArrowDataType::List(_), ArrowDataType::List(field))
327 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
328 let mut schema_fields = HashMap::new();
329 get_fields(our_field_type, field.data_type(), &mut schema_fields)
330 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
331 }
332 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
334 let mut schema_fields = HashMap::new();
335 our_field_type
336 .as_struct()
337 .iter()
338 .for_each(|(name, data_type)| {
339 let res = schema_fields.insert(name, data_type);
340 assert!(res.is_none())
342 });
343 check_compatibility(schema_fields, fields)?
344 }
345 (left, right) => left.equals_datatype(right),
353 };
354 if !compatible {
355 bail!(
356 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
357 arrow_field.name(),
358 converted_arrow_data_type,
359 arrow_field.data_type()
360 );
361 }
362 }
363 Ok(true)
364}
365
366pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
368 if rw_schema.fields.len() != arrow_schema.fields().len() {
369 bail!(
370 "Schema length mismatch, risingwave is {}, and iceberg is {}",
371 rw_schema.fields.len(),
372 arrow_schema.fields.len()
373 );
374 }
375
376 let mut schema_fields = HashMap::new();
377 rw_schema.fields.iter().for_each(|field| {
378 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
379 assert!(res.is_none())
381 });
382
383 check_compatibility(schema_fields, &arrow_schema.fields)?;
384 Ok(())
385}
386
387pub fn parse_partition_by_exprs(
388 expr: String,
389) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
390 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
392 if !re.is_match(&expr) {
393 bail!(format!(
394 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
395 expr
396 ))
397 }
398 let caps = re.captures_iter(&expr);
399
400 let mut partition_columns = vec![];
401
402 for mat in caps {
403 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
404 (&mat["transform"], Transform::Identity)
405 } else {
406 let mut func = mat["transform"].to_owned();
407 if func == "bucket" || func == "truncate" {
408 let n = &mat
409 .name("n")
410 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
411 .as_str();
412 func = format!("{func}[{n}]");
413 }
414 (
415 &mat["field"],
416 Transform::from_str(&func)
417 .with_context(|| format!("invalid transform function {}", func))?,
418 )
419 };
420 partition_columns.push((column.to_owned(), transform));
421 }
422 Ok(partition_columns)
423}
424
425pub fn parse_order_key_exprs(
426 expr: String,
427) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
428 let mut order_keys = Vec::new();
429 let mut seen_columns = std::collections::HashSet::new();
430
431 for raw_item in expr.split(',') {
432 let item = raw_item.trim();
433 if item.is_empty() {
434 bail!("Invalid order key: empty item in `{expr}`");
435 }
436
437 let tokens = item.split_whitespace().collect_vec();
438 if tokens.is_empty() {
439 bail!("Invalid order key item `{item}`");
440 }
441 if tokens.len() > 4 {
442 bail!(
443 "Invalid order key item `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
444 );
445 }
446
447 let column = tokens[0];
448 if !ORDER_KEY_COLUMN_RE.is_match(column) {
449 bail!(
450 "Invalid order key column `{column}`\nHINT: Only plain column names are supported in order_key"
451 );
452 }
453 if !seen_columns.insert(column.to_ascii_lowercase()) {
454 bail!("Duplicate column `{column}` in order_key");
455 }
456
457 let mut direction = SortDirection::Ascending;
458 let mut null_order = None;
459 let mut idx = 1;
460 while idx < tokens.len() {
461 match tokens[idx].to_ascii_lowercase().as_str() {
462 "asc" => {
463 direction = SortDirection::Ascending;
464 idx += 1;
465 }
466 "desc" => {
467 direction = SortDirection::Descending;
468 idx += 1;
469 }
470 "nulls" => {
471 let order = tokens.get(idx + 1).ok_or_else(|| {
472 anyhow!(
473 "Invalid order key item `{item}`: `NULLS` must be followed by `FIRST` or `LAST`"
474 )
475 })?;
476 null_order = Some(match order.to_ascii_lowercase().as_str() {
477 "first" => NullOrder::First,
478 "last" => NullOrder::Last,
479 _ => bail!(
480 "Invalid order key item `{item}`\nHINT: `NULLS` must be followed by `FIRST` or `LAST`"
481 ),
482 });
483 idx += 2;
484 }
485 token => {
486 bail!(
487 "Invalid order key token `{token}` in `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
488 );
489 }
490 }
491 }
492
493 order_keys.push(IcebergOrderKeyField {
494 column: column.to_owned(),
495 direction,
496 null_order: null_order
497 .unwrap_or_else(|| IcebergOrderKeyField::default_null_order(direction)),
498 });
499 }
500
501 if order_keys.is_empty() {
502 bail!("order_key must not be empty");
503 }
504
505 Ok(order_keys)
506}
507
508pub fn validate_order_key_columns<'a>(
509 order_key: &str,
510 columns: impl IntoIterator<Item = &'a str>,
511) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
512 let parsed = parse_order_key_exprs(order_key.to_owned())?;
513 let columns = columns
514 .into_iter()
515 .map(|column| column.to_ascii_lowercase())
516 .collect::<std::collections::HashSet<_>>();
517 for item in &parsed {
518 if item.column.starts_with('_') {
519 bail!(
520 "System column `{}` is not allowed in order_key",
521 item.column
522 );
523 }
524 if !columns.contains(&item.column.to_ascii_lowercase()) {
525 bail!("Order key column does not exist in schema: {}", item.column);
526 }
527 }
528 Ok(parsed)
529}
530
531fn build_sort_order(order_key: &str, schema: &iceberg::spec::Schema) -> Result<SortOrder> {
532 let order_fields = validate_order_key_columns(
533 order_key,
534 schema
535 .as_struct()
536 .fields()
537 .iter()
538 .map(|field| field.name.as_str()),
539 )?;
540 let mut builder = SortOrder::builder();
541 for field in order_fields {
542 let source_id = schema.field_id_by_name(&field.column).ok_or_else(|| {
543 anyhow!(
544 "Order key column does not exist in schema: {}",
545 field.column
546 )
547 })?;
548 builder.with_sort_field(
549 SortField::builder()
550 .source_id(source_id)
551 .transform(Transform::Identity)
552 .direction(field.direction)
553 .null_order(field.null_order)
554 .build(),
555 );
556 }
557 builder
558 .build(schema)
559 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
560}