risingwave_frontend/handler/create_source/external_schema/
nexmark.rs1use super::*;
16
17pub fn check_nexmark_schema(
18 props: &WithOptionsSecResolved,
19 row_id_index: Option<usize>,
20 columns: &[ColumnCatalog],
21) -> Result<()> {
22 let table_type = props
23 .get("nexmark.table.type")
24 .map(|t| t.to_ascii_lowercase());
25
26 let event_type = match table_type.as_deref() {
27 None => None,
28 Some("bid") => Some(EventType::Bid),
29 Some("auction") => Some(EventType::Auction),
30 Some("person") => Some(EventType::Person),
31 Some(t) => {
32 return Err(RwError::from(ProtocolError(format!(
33 "unsupported table type for nexmark source: {}",
34 t
35 ))));
36 }
37 };
38
39 let user_defined_columns = columns.iter().filter(|c| !c.is_generated());
41 let row_id_index = if let Some(index) = row_id_index {
42 let col_id = columns[index].column_id();
43 user_defined_columns
44 .clone()
45 .position(|c| c.column_id() == col_id)
46 .unwrap()
47 .into()
48 } else {
49 None
50 };
51
52 let expected = get_event_data_types_with_names(event_type, row_id_index);
53 let user_defined = user_defined_columns
54 .map(|c| {
55 (
56 c.column_desc.name.to_ascii_lowercase(),
57 c.column_desc.data_type.to_owned(),
58 )
59 })
60 .collect_vec();
61
62 let schema_eq = expected.len() == user_defined.len()
63 && expected
64 .iter()
65 .zip_eq_fast(user_defined.iter())
66 .all(|((name1, type1), (name2, type2))| name1 == name2 && type1.equals_datatype(type2));
67
68 if !schema_eq {
69 let cmp = pretty_assertions::Comparison::new(&expected, &user_defined);
70 return Err(RwError::from(ProtocolError(format!(
71 "The schema of the nexmark source must specify all columns in order:\n{cmp}",
72 ))));
73 }
74 Ok(())
75}