1use std::collections::HashMap;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use iceberg::arrow::schema_to_arrow_schema;
21use iceberg::spec::{
22 NullOrder, SortDirection, SortField, SortOrder, TableProperties, Transform,
23 UnboundPartitionField, UnboundPartitionSpec,
24};
25use iceberg::table::Table;
26use iceberg::{Catalog, NamespaceIdent, TableCreation};
27use itertools::Itertools;
28use regex::Regex;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{
30 self, DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields,
31 Schema as ArrowSchema,
32};
33use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
34use risingwave_common::bail;
35use risingwave_common::catalog::Schema;
36use risingwave_common::util::iter_util::ZipEqFast;
37use url::Url;
38
39use super::{IcebergConfig, PARTITION_DATA_ID_START, SinkError};
40use crate::sink::{Result, SinkParam};
41
42static ORDER_KEY_COLUMN_RE: LazyLock<Regex> =
43 LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").expect("valid order key regex"));
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct IcebergOrderKeyField {
47 pub column: String,
48 pub direction: SortDirection,
49 pub null_order: NullOrder,
50}
51
52impl IcebergOrderKeyField {
53 fn default_null_order(direction: SortDirection) -> NullOrder {
54 match direction {
55 SortDirection::Ascending => NullOrder::First,
56 SortDirection::Descending => NullOrder::Last,
57 }
58 }
59}
60
61pub async fn create_and_validate_table_impl(
62 config: &IcebergConfig,
63 param: &SinkParam,
64) -> Result<Table> {
65 if config.create_table_if_not_exists {
66 create_table_if_not_exists_impl(config, param).await?;
67 }
68
69 let table = config
70 .load_table()
71 .await
72 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
73
74 let sink_schema = param.schema();
75 let iceberg_arrow_schema = schema_to_arrow_schema(table.metadata().current_schema())
76 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
77
78 try_matches_arrow_schema(&sink_schema, &iceberg_arrow_schema)
79 .map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
80
81 Ok(table)
82}
83
84pub(super) async fn create_table_if_not_exists_impl(
85 config: &IcebergConfig,
86 param: &SinkParam,
87) -> Result<()> {
88 let catalog = config.create_catalog().await?;
89 let table_id = config
90 .full_table_name()
91 .context("Unable to parse table name")?;
92 let namespace = table_id.namespace().clone();
93 let table_name = table_id.name().to_owned();
94 create_namespace_if_not_exists(catalog.as_ref(), &namespace).await?;
95
96 if !catalog
97 .table_exists(&table_id)
98 .await
99 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
100 {
101 let iceberg_create_table_arrow_convert = IcebergCreateTableArrowConvert::default();
102 let arrow_fields = param
104 .columns
105 .iter()
106 .map(|column| {
107 Ok(iceberg_create_table_arrow_convert
108 .to_arrow_field(&column.name, &column.data_type)
109 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
110 .context(format!(
111 "failed to convert {}: {} to arrow type",
112 &column.name, &column.data_type
113 ))?)
114 })
115 .collect::<Result<Vec<ArrowField>>>()?;
116 let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
117 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
118 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
119 .context("failed to convert arrow schema to iceberg schema")?;
120
121 let location = {
122 let mut names = namespace.clone().inner();
123 names.push(table_name.clone());
124 match &config.common.warehouse_path {
125 Some(warehouse_path) => {
126 let is_s3_tables = warehouse_path.starts_with("arn:aws:s3tables");
127 let is_bq_catalog_federation = warehouse_path.starts_with("bq://");
129 let url = Url::parse(warehouse_path);
130 if url.is_err() || is_s3_tables || is_bq_catalog_federation {
131 if config.common.catalog_type() == "rest"
134 || config.common.catalog_type() == "rest_rust"
135 {
136 None
137 } else {
138 bail!(format!("Invalid warehouse path: {}", warehouse_path))
139 }
140 } else if warehouse_path.ends_with('/') {
141 Some(format!("{}{}", warehouse_path, names.join("/")))
142 } else {
143 Some(format!("{}/{}", warehouse_path, names.join("/")))
144 }
145 }
146 None => None,
147 }
148 };
149
150 let partition_spec = match &config.partition_by {
151 Some(partition_by) => {
152 let mut partition_fields = Vec::<UnboundPartitionField>::new();
153 for (i, (column, transform)) in parse_partition_by_exprs(partition_by.clone())?
154 .into_iter()
155 .enumerate()
156 {
157 match iceberg_schema.field_id_by_name(&column) {
158 Some(id) => partition_fields.push(
159 UnboundPartitionField::builder()
160 .source_id(id)
161 .transform(transform)
162 .name(format!("_p_{}", column))
163 .field_id(PARTITION_DATA_ID_START + i as i32)
164 .build(),
165 ),
166 None => bail!(format!(
167 "Partition source column does not exist in schema: {}",
168 column
169 )),
170 };
171 }
172 Some(
173 UnboundPartitionSpec::builder()
174 .with_spec_id(0)
175 .add_partition_fields(partition_fields)
176 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
177 .context("failed to add partition columns")?
178 .build(),
179 )
180 }
181 None => None,
182 };
183
184 let sort_order = match &config.order_key {
185 Some(order_key) => Some(build_sort_order(order_key, &iceberg_schema)?),
186 None => None,
187 };
188
189 let properties = HashMap::from([(
191 TableProperties::PROPERTY_FORMAT_VERSION.to_owned(),
192 (config.format_version as u8).to_string(),
193 )]);
194
195 let table_creation_builder = TableCreation::builder()
196 .name(table_name)
197 .schema(iceberg_schema)
198 .format_version(config.table_format_version())
199 .properties(properties);
200
201 let table_creation = match (location, partition_spec, sort_order) {
202 (Some(location), Some(partition_spec), Some(sort_order)) => table_creation_builder
203 .location(location)
204 .partition_spec(partition_spec)
205 .sort_order(sort_order)
206 .build(),
207 (Some(location), Some(partition_spec), None) => table_creation_builder
208 .location(location)
209 .partition_spec(partition_spec)
210 .build(),
211 (Some(location), None, Some(sort_order)) => table_creation_builder
212 .location(location)
213 .sort_order(sort_order)
214 .build(),
215 (Some(location), None, None) => table_creation_builder.location(location).build(),
216 (None, Some(partition_spec), Some(sort_order)) => table_creation_builder
217 .partition_spec(partition_spec)
218 .sort_order(sort_order)
219 .build(),
220 (None, Some(partition_spec), None) => table_creation_builder
221 .partition_spec(partition_spec)
222 .build(),
223 (None, None, Some(sort_order)) => table_creation_builder.sort_order(sort_order).build(),
224 (None, None, None) => table_creation_builder.build(),
225 };
226
227 catalog
228 .create_table(&namespace, table_creation)
229 .await
230 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
231 .context("failed to create iceberg table")?;
232 }
233 Ok(())
234}
235
236async fn create_namespace_if_not_exists(
237 catalog: &dyn Catalog,
238 namespace: &NamespaceIdent,
239) -> Result<()> {
240 let mut namespaces = vec![namespace.clone()];
241 let mut parent = namespace.parent();
242 while let Some(parent_namespace) = parent {
243 parent = parent_namespace.parent();
244 namespaces.push(parent_namespace);
245 }
246
247 for namespace in namespaces.into_iter().rev() {
248 if !catalog
249 .namespace_exists(&namespace)
250 .await
251 .map_err(|e| SinkError::Iceberg(anyhow!(e)))?
252 {
253 catalog
254 .create_namespace(&namespace, HashMap::default())
255 .await
256 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
257 .with_context(|| format!("failed to create iceberg namespace: {namespace}"))?;
258 }
259 }
260
261 Ok(())
262}
263
264const MAP_KEY: &str = "key";
265const MAP_VALUE: &str = "value";
266
267fn get_fields<'a>(
268 our_field_type: &'a risingwave_common::types::DataType,
269 data_type: &ArrowDataType,
270 schema_fields: &mut HashMap<&'a str, &'a risingwave_common::types::DataType>,
271) -> Option<ArrowFields> {
272 match data_type {
273 ArrowDataType::Struct(fields) => {
274 match our_field_type {
275 risingwave_common::types::DataType::Struct(struct_fields) => {
276 struct_fields.iter().for_each(|(name, data_type)| {
277 let res = schema_fields.insert(name, data_type);
278 assert!(res.is_none())
280 });
281 }
282 risingwave_common::types::DataType::Map(map_fields) => {
283 schema_fields.insert(MAP_KEY, map_fields.key());
284 schema_fields.insert(MAP_VALUE, map_fields.value());
285 }
286 risingwave_common::types::DataType::List(list) => {
287 list.elem()
288 .as_struct()
289 .iter()
290 .for_each(|(name, data_type)| {
291 let res = schema_fields.insert(name, data_type);
292 assert!(res.is_none())
294 });
295 }
296 _ => {}
297 };
298 Some(fields.clone())
299 }
300 ArrowDataType::List(field) | ArrowDataType::Map(field, _) => {
301 get_fields(our_field_type, field.data_type(), schema_fields)
302 }
303 _ => None, }
305}
306
307fn check_compatibility(
308 schema_fields: HashMap<&str, &risingwave_common::types::DataType>,
309 fields: &ArrowFields,
310) -> anyhow::Result<bool> {
311 for arrow_field in fields {
312 let our_field_type = schema_fields
313 .get(arrow_field.name().as_str())
314 .ok_or_else(|| anyhow!("Field {} not found in our schema", arrow_field.name()))?;
315
316 let converted_arrow_data_type = IcebergArrowConvert
318 .to_arrow_field("", our_field_type)
319 .map_err(|e| anyhow!(e))?
320 .data_type()
321 .clone();
322
323 let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
324 (ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
325 (ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
326 (ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
327 (ArrowDataType::List(_), ArrowDataType::List(field))
328 | (ArrowDataType::Map(_, _), ArrowDataType::Map(field, _)) => {
329 let mut schema_fields = HashMap::new();
330 get_fields(our_field_type, field.data_type(), &mut schema_fields)
331 .is_none_or(|fields| check_compatibility(schema_fields, &fields).unwrap())
332 }
333 (ArrowDataType::Struct(_), ArrowDataType::Struct(fields)) => {
335 let mut schema_fields = HashMap::new();
336 our_field_type
337 .as_struct()
338 .iter()
339 .for_each(|(name, data_type)| {
340 let res = schema_fields.insert(name, data_type);
341 assert!(res.is_none())
343 });
344 check_compatibility(schema_fields, fields)?
345 }
346 (left, right) => left.equals_datatype(right),
354 };
355 if !compatible {
356 bail!(
357 "field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
358 arrow_field.name(),
359 converted_arrow_data_type,
360 arrow_field.data_type()
361 );
362 }
363 }
364 Ok(true)
365}
366
367pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
369 if rw_schema.fields.len() != arrow_schema.fields().len() {
370 bail!(
371 "Schema length mismatch, risingwave is {}, and iceberg is {}",
372 rw_schema.fields.len(),
373 arrow_schema.fields.len()
374 );
375 }
376
377 let mut schema_fields = HashMap::new();
378 rw_schema.fields.iter().for_each(|field| {
379 let res = schema_fields.insert(field.name.as_str(), &field.data_type);
380 assert!(res.is_none())
382 });
383
384 check_compatibility(schema_fields, &arrow_schema.fields)?;
385
386 for (idx, (rw_field, arrow_field)) in rw_schema
389 .fields
390 .iter()
391 .zip_eq_fast(arrow_schema.fields().iter())
392 .enumerate()
393 {
394 if rw_field.name.as_str() != arrow_field.name().as_str() {
395 bail!(
396 "Column order mismatch at position {}: the sink has column `{}` but the \
397 Iceberg table has column `{}`. The Iceberg sink maps columns to the table \
398 by position, so the sink's column order must match the Iceberg table \
399 columns [{}].",
400 idx,
401 rw_field.name,
402 arrow_field.name(),
403 arrow_schema.fields().iter().map(|f| f.name()).join(", "),
404 );
405 }
406 }
407
408 Ok(())
409}
410
411pub fn parse_partition_by_exprs(
412 expr: String,
413) -> std::result::Result<Vec<(String, Transform)>, anyhow::Error> {
414 let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
416 if !re.is_match(&expr) {
417 bail!(format!(
418 "Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)",
419 expr
420 ))
421 }
422 let caps = re.captures_iter(&expr);
423
424 let mut partition_columns = vec![];
425
426 for mat in caps {
427 let (column, transform) = if mat.name("n").is_none() && mat.name("field").is_none() {
428 (&mat["transform"], Transform::Identity)
429 } else {
430 let mut func = mat["transform"].to_owned();
431 if func == "bucket" || func == "truncate" {
432 let n = &mat
433 .name("n")
434 .ok_or_else(|| anyhow!("The `n` must be set with `bucket` and `truncate`"))?
435 .as_str();
436 func = format!("{func}[{n}]");
437 }
438 (
439 &mat["field"],
440 Transform::from_str(&func)
441 .with_context(|| format!("invalid transform function {}", func))?,
442 )
443 };
444 partition_columns.push((column.to_owned(), transform));
445 }
446 Ok(partition_columns)
447}
448
449pub fn parse_order_key_exprs(
450 expr: String,
451) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
452 let mut order_keys = Vec::new();
453 let mut seen_columns = std::collections::HashSet::new();
454
455 for raw_item in expr.split(',') {
456 let item = raw_item.trim();
457 if item.is_empty() {
458 bail!("Invalid order key: empty item in `{expr}`");
459 }
460
461 let tokens = item.split_whitespace().collect_vec();
462 if tokens.is_empty() {
463 bail!("Invalid order key item `{item}`");
464 }
465 if tokens.len() > 4 {
466 bail!(
467 "Invalid order key item `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
468 );
469 }
470
471 let column = tokens[0];
472 if !ORDER_KEY_COLUMN_RE.is_match(column) {
473 bail!(
474 "Invalid order key column `{column}`\nHINT: Only plain column names are supported in order_key"
475 );
476 }
477 if !seen_columns.insert(column.to_ascii_lowercase()) {
478 bail!("Duplicate column `{column}` in order_key");
479 }
480
481 let mut direction = SortDirection::Ascending;
482 let mut null_order = None;
483 let mut idx = 1;
484 while idx < tokens.len() {
485 match tokens[idx].to_ascii_lowercase().as_str() {
486 "asc" => {
487 direction = SortDirection::Ascending;
488 idx += 1;
489 }
490 "desc" => {
491 direction = SortDirection::Descending;
492 idx += 1;
493 }
494 "nulls" => {
495 let order = tokens.get(idx + 1).ok_or_else(|| {
496 anyhow!(
497 "Invalid order key item `{item}`: `NULLS` must be followed by `FIRST` or `LAST`"
498 )
499 })?;
500 null_order = Some(match order.to_ascii_lowercase().as_str() {
501 "first" => NullOrder::First,
502 "last" => NullOrder::Last,
503 _ => bail!(
504 "Invalid order key item `{item}`\nHINT: `NULLS` must be followed by `FIRST` or `LAST`"
505 ),
506 });
507 idx += 2;
508 }
509 token => {
510 bail!(
511 "Invalid order key token `{token}` in `{item}`\nHINT: Supported format is `column [asc|desc] [nulls first|last]`"
512 );
513 }
514 }
515 }
516
517 order_keys.push(IcebergOrderKeyField {
518 column: column.to_owned(),
519 direction,
520 null_order: null_order
521 .unwrap_or_else(|| IcebergOrderKeyField::default_null_order(direction)),
522 });
523 }
524
525 if order_keys.is_empty() {
526 bail!("order_key must not be empty");
527 }
528
529 Ok(order_keys)
530}
531
532pub fn validate_order_key_columns<'a>(
533 order_key: &str,
534 columns: impl IntoIterator<Item = &'a str>,
535) -> std::result::Result<Vec<IcebergOrderKeyField>, anyhow::Error> {
536 let parsed = parse_order_key_exprs(order_key.to_owned())?;
537 let columns = columns
538 .into_iter()
539 .map(|column| column.to_ascii_lowercase())
540 .collect::<std::collections::HashSet<_>>();
541 for item in &parsed {
542 if item.column.starts_with('_') {
543 bail!(
544 "System column `{}` is not allowed in order_key",
545 item.column
546 );
547 }
548 if !columns.contains(&item.column.to_ascii_lowercase()) {
549 bail!("Order key column does not exist in schema: {}", item.column);
550 }
551 }
552 Ok(parsed)
553}
554
555fn build_sort_order(order_key: &str, schema: &iceberg::spec::Schema) -> Result<SortOrder> {
556 let order_fields = validate_order_key_columns(
557 order_key,
558 schema
559 .as_struct()
560 .fields()
561 .iter()
562 .map(|field| field.name.as_str()),
563 )?;
564 let mut builder = SortOrder::builder();
565 for field in order_fields {
566 let source_id = schema.field_id_by_name(&field.column).ok_or_else(|| {
567 anyhow!(
568 "Order key column does not exist in schema: {}",
569 field.column
570 )
571 })?;
572 builder.with_sort_field(
573 SortField::builder()
574 .source_id(source_id)
575 .transform(Transform::Identity)
576 .direction(field.direction)
577 .null_order(field.null_order)
578 .build(),
579 );
580 }
581 builder
582 .build(schema)
583 .map_err(|e| SinkError::Iceberg(anyhow!(e)))
584}