risingwave_frontend/handler/create_source/external_schema/
nexmark.rs1use super::*;
16
17pub fn check_nexmark_schema(
18    props: &WithOptionsSecResolved,
19    row_id_index: Option<usize>,
20    columns: &[ColumnCatalog],
21) -> Result<()> {
22    let table_type = props
23        .get("nexmark.table.type")
24        .map(|t| t.to_ascii_lowercase());
25
26    let event_type = match table_type.as_deref() {
27        None => None,
28        Some("bid") => Some(EventType::Bid),
29        Some("auction") => Some(EventType::Auction),
30        Some("person") => Some(EventType::Person),
31        Some(t) => {
32            return Err(RwError::from(ProtocolError(format!(
33                "unsupported table type for nexmark source: {}",
34                t
35            ))));
36        }
37    };
38
39    let user_defined_columns = columns.iter().filter(|c| !c.is_generated());
41    let row_id_index = if let Some(index) = row_id_index {
42        let col_id = columns[index].column_id();
43        user_defined_columns
44            .clone()
45            .position(|c| c.column_id() == col_id)
46            .unwrap()
47            .into()
48    } else {
49        None
50    };
51
52    let expected = get_event_data_types_with_names(event_type, row_id_index);
53    let user_defined = user_defined_columns
54        .map(|c| {
55            (
56                c.column_desc.name.to_ascii_lowercase(),
57                c.column_desc.data_type.clone(),
58            )
59        })
60        .collect_vec();
61
62    let schema_eq = expected.len() == user_defined.len()
63        && expected
64            .iter()
65            .zip_eq_fast(user_defined.iter())
66            .all(|((name1, type1), (name2, type2))| name1 == name2 && type1.equals_datatype(type2));
67
68    if !schema_eq {
69        let cmp = pretty_assertions::Comparison::new(&expected, &user_defined);
70        return Err(RwError::from(ProtocolError(format!(
71            "The schema of the nexmark source must specify all columns in order:\n{cmp}",
72        ))));
73    }
74    Ok(())
75}