risingwave_frontend/handler/create_source/external_schema/
nexmark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub fn check_nexmark_schema(
18    props: &WithOptionsSecResolved,
19    row_id_index: Option<usize>,
20    columns: &[ColumnCatalog],
21) -> Result<()> {
22    let table_type = props
23        .get("nexmark.table.type")
24        .map(|t| t.to_ascii_lowercase());
25
26    let event_type = match table_type.as_deref() {
27        None => None,
28        Some("bid") => Some(EventType::Bid),
29        Some("auction") => Some(EventType::Auction),
30        Some("person") => Some(EventType::Person),
31        Some(t) => {
32            return Err(RwError::from(ProtocolError(format!(
33                "unsupported table type for nexmark source: {}",
34                t
35            ))));
36        }
37    };
38
39    // Ignore the generated columns and map the index of row_id column.
40    let user_defined_columns = columns.iter().filter(|c| !c.is_generated());
41    let row_id_index = if let Some(index) = row_id_index {
42        let col_id = columns[index].column_id();
43        user_defined_columns
44            .clone()
45            .position(|c| c.column_id() == col_id)
46            .unwrap()
47            .into()
48    } else {
49        None
50    };
51
52    let expected = get_event_data_types_with_names(event_type, row_id_index);
53    let user_defined = user_defined_columns
54        .map(|c| {
55            (
56                c.column_desc.name.to_ascii_lowercase(),
57                c.column_desc.data_type.to_owned(),
58            )
59        })
60        .collect_vec();
61
62    let schema_eq = expected.len() == user_defined.len()
63        && expected
64            .iter()
65            .zip_eq_fast(user_defined.iter())
66            .all(|((name1, type1), (name2, type2))| name1 == name2 && type1.equals_datatype(type2));
67
68    if !schema_eq {
69        let cmp = pretty_assertions::Comparison::new(&expected, &user_defined);
70        return Err(RwError::from(ProtocolError(format!(
71            "The schema of the nexmark source must specify all columns in order:\n{cmp}",
72        ))));
73    }
74    Ok(())
75}