risingwave_frontend/handler/create_source/
validate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
18    LazyLock::new(|| {
19        hashset! {
20            PbConnectionType::Unspecified,
21            PbConnectionType::Kafka,
22            PbConnectionType::Iceberg,
23        }
24    });
25
26pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
27    LazyLock::new(|| {
28        hashset! {
29            PbConnectionType::Unspecified,
30            PbConnectionType::SchemaRegistry,
31        }
32    });
33
34// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
35static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
36    LazyLock::new(|| {
37        convert_args!(hashmap!(
38                KAFKA_CONNECTOR => hashmap!(
39                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
40                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
41                    Format::Debezium => vec![Encode::Json, Encode::Avro],
42                    Format::Maxwell => vec![Encode::Json],
43                    Format::Canal => vec![Encode::Json],
44                    Format::DebeziumMongo => vec![Encode::Json],
45                ),
46                PULSAR_CONNECTOR => hashmap!(
47                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
48                    Format::Upsert => vec![Encode::Json, Encode::Avro],
49                    Format::Debezium => vec![Encode::Json],
50                    Format::Maxwell => vec![Encode::Json],
51                    Format::Canal => vec![Encode::Json],
52                ),
53                KINESIS_CONNECTOR => hashmap!(
54                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
55                    Format::Upsert => vec![Encode::Json, Encode::Avro],
56                    Format::Debezium => vec![Encode::Json],
57                    Format::Maxwell => vec![Encode::Json],
58                    Format::Canal => vec![Encode::Json],
59                ),
60                GOOGLE_PUBSUB_CONNECTOR => hashmap!(
61                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
62                    Format::Debezium => vec![Encode::Json],
63                    Format::Maxwell => vec![Encode::Json],
64                    Format::Canal => vec![Encode::Json],
65                ),
66                NEXMARK_CONNECTOR => hashmap!(
67                    Format::Native => vec![Encode::Native],
68                    Format::Plain => vec![Encode::Bytes],
69                ),
70                DATAGEN_CONNECTOR => hashmap!(
71                    Format::Native => vec![Encode::Native],
72                    Format::Plain => vec![Encode::Bytes, Encode::Json],
73                ),
74                OPENDAL_S3_CONNECTOR => hashmap!(
75                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
76                ),
77                GCS_CONNECTOR => hashmap!(
78                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
79                ),
80                AZBLOB_CONNECTOR => hashmap!(
81                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
82                ),
83                POSIX_FS_CONNECTOR => hashmap!(
84                    Format::Plain => vec![Encode::Csv],
85                ),
86                MYSQL_CDC_CONNECTOR => hashmap!(
87                    Format::Debezium => vec![Encode::Json],
88                    // support source stream job
89                    Format::Plain => vec![Encode::Json],
90                ),
91                POSTGRES_CDC_CONNECTOR => hashmap!(
92                    Format::Debezium => vec![Encode::Json],
93                    // support source stream job
94                    Format::Plain => vec![Encode::Json],
95                ),
96                CITUS_CDC_CONNECTOR => hashmap!(
97                    Format::Debezium => vec![Encode::Json],
98                ),
99                MONGODB_CDC_CONNECTOR => hashmap!(
100                    Format::DebeziumMongo => vec![Encode::Json],
101                ),
102                NATS_CONNECTOR => hashmap!(
103                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
104                ),
105                MQTT_CONNECTOR => hashmap!(
106                    Format::Plain => vec![Encode::Json, Encode::Bytes],
107                ),
108                TEST_CONNECTOR => hashmap!(
109                    Format::Plain => vec![Encode::Json],
110                ),
111                ICEBERG_CONNECTOR => hashmap!(
112                    Format::None => vec![Encode::None],
113                ),
114                SQL_SERVER_CDC_CONNECTOR => hashmap!(
115                    Format::Debezium => vec![Encode::Json],
116                    // support source stream job
117                    Format::Plain => vec![Encode::Json],
118                ),
119        ))
120    });
121
122fn validate_license(connector: &str) -> Result<()> {
123    if connector == SQL_SERVER_CDC_CONNECTOR {
124        Feature::SqlServerCdcSource
125            .check_available()
126            .map_err(|e| anyhow::anyhow!(e))?;
127    }
128    Ok(())
129}
130
131pub fn validate_compatibility(
132    format_encode: &FormatEncodeOptions,
133    props: &mut BTreeMap<String, String>,
134) -> Result<()> {
135    let mut connector = props
136        .get_connector()
137        .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
138
139    if connector == OPENDAL_S3_CONNECTOR {
140        // reject s3_v2 creation
141        return Err(RwError::from(Deprecated(
142            OPENDAL_S3_CONNECTOR.to_owned(),
143            LEGACY_S3_CONNECTOR.to_owned(),
144        )));
145    }
146    if connector == LEGACY_S3_CONNECTOR {
147        // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
148        // do s3 -> s3_v2 migration
149        let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
150        *entry = OPENDAL_S3_CONNECTOR.to_owned();
151        connector = OPENDAL_S3_CONNECTOR.to_owned();
152    }
153
154    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
155        .get(&connector)
156        .ok_or_else(|| {
157            RwError::from(ProtocolError(format!(
158                "connector {:?} is not supported, accept {:?}",
159                connector,
160                CONNECTORS_COMPATIBLE_FORMATS.keys()
161            )))
162        })?;
163
164    validate_license(&connector)?;
165    if connector != KAFKA_CONNECTOR {
166        let res = match (&format_encode.format, &format_encode.row_encode) {
167            (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
168                let mut options = WithOptions::try_from(format_encode.row_options())?;
169                let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
170                use_schema_registry
171            }
172            (Format::Debezium, Encode::Avro) => true,
173            (_, _) => false,
174        };
175        if res {
176            return Err(RwError::from(ProtocolError(format!(
177                "The {} must be kafka when schema registry is used",
178                UPSTREAM_SOURCE_KEY
179            ))));
180        }
181    }
182
183    let compatible_encodes = compatible_formats
184        .get(&format_encode.format)
185        .ok_or_else(|| {
186            RwError::from(ProtocolError(format!(
187                "connector {} does not support format {:?}",
188                connector, format_encode.format
189            )))
190        })?;
191    if !compatible_encodes.contains(&format_encode.row_encode) {
192        return Err(RwError::from(ProtocolError(format!(
193            "connector {} does not support format {:?} with encode {:?}",
194            connector, format_encode.format, format_encode.row_encode
195        ))));
196    }
197
198    if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
199        match props.get("slot.name") {
200            None => {
201                // Build a random slot name with UUID
202                // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
203                let uuid = uuid::Uuid::new_v4();
204                props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
205            }
206            Some(slot_name) => {
207                // please refer to
208                // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179
209                // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3
210                if !slot_name
211                    .chars()
212                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
213                    || slot_name.len() > 63
214                {
215                    return Err(RwError::from(ProtocolError(format!(
216                        "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
217                        slot_name
218                    ))));
219                }
220            }
221        }
222
223        if !props.contains_key("schema.name") {
224            // Default schema name is "public"
225            props.insert("schema.name".into(), "public".into());
226        }
227        if !props.contains_key("publication.name") {
228            // Default publication name is "rw_publication"
229            props.insert("publication.name".into(), "rw_publication".into());
230        }
231        if !props.contains_key("publication.create.enable") {
232            // Default auto create publication if doesn't exist
233            props.insert("publication.create.enable".into(), "true".into());
234        }
235    }
236
237    if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
238        // Default schema name is "dbo"
239        props.insert("schema.name".into(), "dbo".into());
240    }
241
242    Ok(())
243}