risingwave_frontend/handler/create_source/external_schema/
nexmark.rsuse super::*;
pub fn check_nexmark_schema(
props: &WithOptionsSecResolved,
row_id_index: Option<usize>,
columns: &[ColumnCatalog],
) -> Result<()> {
let table_type = props
.get("nexmark.table.type")
.map(|t| t.to_ascii_lowercase());
let event_type = match table_type.as_deref() {
None => None,
Some("bid") => Some(EventType::Bid),
Some("auction") => Some(EventType::Auction),
Some("person") => Some(EventType::Person),
Some(t) => {
return Err(RwError::from(ProtocolError(format!(
"unsupported table type for nexmark source: {}",
t
))))
}
};
let user_defined_columns = columns.iter().filter(|c| !c.is_generated());
let row_id_index = if let Some(index) = row_id_index {
let col_id = columns[index].column_id();
user_defined_columns
.clone()
.position(|c| c.column_id() == col_id)
.unwrap()
.into()
} else {
None
};
let expected = get_event_data_types_with_names(event_type, row_id_index);
let user_defined = user_defined_columns
.map(|c| {
(
c.column_desc.name.to_ascii_lowercase(),
c.column_desc.data_type.to_owned(),
)
})
.collect_vec();
if expected != user_defined {
let cmp = pretty_assertions::Comparison::new(&expected, &user_defined);
return Err(RwError::from(ProtocolError(format!(
"The schema of the nexmark source must specify all columns in order:\n{cmp}",
))));
}
Ok(())
}