risingwave_frontend/handler/create_source/
validate.rs1use risingwave_connector::source::{ADBC_SNOWFLAKE_CONNECTOR, BATCH_POSIX_FS_CONNECTOR};
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20 LazyLock::new(|| {
21 hashset! {
22 PbConnectionType::Unspecified,
23 PbConnectionType::Kafka,
24 PbConnectionType::Iceberg,
25 }
26 });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29 LazyLock::new(|| {
30 hashset! {
31 PbConnectionType::Unspecified,
32 PbConnectionType::SchemaRegistry,
33 }
34 });
35
36static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38 LazyLock::new(|| {
39 convert_args!(hashmap!(
40 KAFKA_CONNECTOR => hashmap!(
41 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43 Format::Debezium => vec![Encode::Json, Encode::Avro],
44 Format::Maxwell => vec![Encode::Json],
45 Format::Canal => vec![Encode::Json],
46 Format::DebeziumMongo => vec![Encode::Json],
47 ),
48 PULSAR_CONNECTOR => hashmap!(
49 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
50 Format::Upsert => vec![Encode::Json, Encode::Avro],
51 Format::Debezium => vec![Encode::Json],
52 Format::Maxwell => vec![Encode::Json],
53 Format::Canal => vec![Encode::Json],
54 ),
55 KINESIS_CONNECTOR => hashmap!(
56 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
57 Format::Upsert => vec![Encode::Json, Encode::Avro],
58 Format::Debezium => vec![Encode::Json],
59 Format::Maxwell => vec![Encode::Json],
60 Format::Canal => vec![Encode::Json],
61 ),
62 GOOGLE_PUBSUB_CONNECTOR => hashmap!(
63 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
64 Format::Debezium => vec![Encode::Json],
65 Format::Maxwell => vec![Encode::Json],
66 Format::Canal => vec![Encode::Json],
67 ),
68 NEXMARK_CONNECTOR => hashmap!(
69 Format::Native => vec![Encode::Native],
70 Format::Plain => vec![Encode::Bytes],
71 ),
72 DATAGEN_CONNECTOR => hashmap!(
73 Format::Native => vec![Encode::Native],
74 Format::Plain => vec![Encode::Bytes, Encode::Json],
75 ),
76 OPENDAL_S3_CONNECTOR => hashmap!(
77 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
78 ),
79 GCS_CONNECTOR => hashmap!(
80 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
81 ),
82 AZBLOB_CONNECTOR => hashmap!(
83 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
84 ),
85 POSIX_FS_CONNECTOR => hashmap!(
86 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
87 ),
88 BATCH_POSIX_FS_CONNECTOR => hashmap!(
89 Format::Plain => vec![Encode::Csv],
90 ),
91 MYSQL_CDC_CONNECTOR => hashmap!(
92 Format::Debezium => vec![Encode::Json],
93 Format::Plain => vec![Encode::Json],
95 ),
96 POSTGRES_CDC_CONNECTOR => hashmap!(
97 Format::Debezium => vec![Encode::Json],
98 Format::Plain => vec![Encode::Json],
100 ),
101 CITUS_CDC_CONNECTOR => hashmap!(
102 Format::Debezium => vec![Encode::Json],
103 ),
104 MONGODB_CDC_CONNECTOR => hashmap!(
105 Format::DebeziumMongo => vec![Encode::Json],
106 ),
107 NATS_CONNECTOR => hashmap!(
108 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
109 ),
110 MQTT_CONNECTOR => hashmap!(
111 Format::Plain => vec![Encode::Json, Encode::Bytes],
112 ),
113 TEST_CONNECTOR => hashmap!(
114 Format::Plain => vec![Encode::Json],
115 ),
116 ICEBERG_CONNECTOR => hashmap!(
117 Format::None => vec![Encode::None],
118 ),
119 ADBC_SNOWFLAKE_CONNECTOR => hashmap!(
120 Format::None => vec![Encode::None],
121 ),
122 SQL_SERVER_CDC_CONNECTOR => hashmap!(
123 Format::Debezium => vec![Encode::Json],
124 Format::Plain => vec![Encode::Json],
126 ),
127 ))
128 });
129
130fn validate_license(connector: &str) -> Result<()> {
131 if connector == SQL_SERVER_CDC_CONNECTOR {
132 Feature::SqlServerCdcSource.check_available()?;
133 }
134 Ok(())
135}
136
137pub fn validate_compatibility(
138 format_encode: &FormatEncodeOptions,
139 props: &mut BTreeMap<String, String>,
140) -> Result<()> {
141 let mut connector = props
142 .get_connector()
143 .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
144
145 if connector == OPENDAL_S3_CONNECTOR {
146 return Err(RwError::from(Deprecated(
148 OPENDAL_S3_CONNECTOR.to_owned(),
149 LEGACY_S3_CONNECTOR.to_owned(),
150 )));
151 }
152 if connector == LEGACY_S3_CONNECTOR {
153 let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
156 *entry = OPENDAL_S3_CONNECTOR.to_owned();
157 connector = OPENDAL_S3_CONNECTOR.to_owned();
158 }
159
160 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
161 .get(&connector)
162 .ok_or_else(|| {
163 RwError::from(ProtocolError(format!(
164 "connector {:?} is not supported, accept {:?}",
165 connector,
166 CONNECTORS_COMPATIBLE_FORMATS.keys()
167 )))
168 })?;
169
170 validate_license(&connector)?;
171 if connector != KAFKA_CONNECTOR {
172 let res = match (&format_encode.format, &format_encode.row_encode) {
173 (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
174 let mut options = WithOptions::try_from(format_encode.row_options())?;
175 let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
176 use_schema_registry
177 }
178 (Format::Debezium, Encode::Avro) => true,
179 (_, _) => false,
180 };
181 if res {
182 return Err(RwError::from(ProtocolError(format!(
183 "The {} must be kafka when schema registry is used",
184 UPSTREAM_SOURCE_KEY
185 ))));
186 }
187 }
188
189 let compatible_encodes = compatible_formats
190 .get(&format_encode.format)
191 .ok_or_else(|| {
192 RwError::from(ProtocolError(format!(
193 "connector {} does not support format {:?}",
194 connector, format_encode.format
195 )))
196 })?;
197 if !compatible_encodes.contains(&format_encode.row_encode) {
198 return Err(RwError::from(ProtocolError(format!(
199 "connector {} does not support format {:?} with encode {:?}",
200 connector, format_encode.format, format_encode.row_encode
201 ))));
202 }
203
204 if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
205 match props.get("slot.name") {
206 None => {
207 let uuid = uuid::Uuid::new_v4();
210 props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
211 }
212 Some(slot_name) => {
213 if !slot_name
217 .chars()
218 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
219 || slot_name.len() > 63
220 {
221 return Err(RwError::from(ProtocolError(format!(
222 "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
223 slot_name
224 ))));
225 }
226 }
227 }
228
229 if !props.contains_key("schema.name") {
230 props.insert("schema.name".into(), "public".into());
232 }
233 if !props.contains_key("publication.name") {
234 props.insert("publication.name".into(), "rw_publication".into());
236 }
237 if !props.contains_key("publication.create.enable") {
238 props.insert("publication.create.enable".into(), "true".into());
240 }
241 }
242
243 if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
244 props.insert("schema.name".into(), "dbo".into());
246 }
247
248 if (connector == MYSQL_CDC_CONNECTOR
250 || connector == POSTGRES_CDC_CONNECTOR
251 || connector == CITUS_CDC_CONNECTOR
252 || connector == MONGODB_CDC_CONNECTOR
253 || connector == SQL_SERVER_CDC_CONNECTOR)
254 && let Some(timeout_value) = props.get("cdc.source.wait.streaming.start.timeout")
255 && timeout_value.parse::<u32>().is_err()
256 {
257 return Err(ErrorCode::InvalidConfigValue {
258 config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
259 config_value: timeout_value.to_owned(),
260 }
261 .into());
262 }
263
264 if (connector == MYSQL_CDC_CONNECTOR
266 || connector == POSTGRES_CDC_CONNECTOR
267 || connector == CITUS_CDC_CONNECTOR
268 || connector == MONGODB_CDC_CONNECTOR
269 || connector == SQL_SERVER_CDC_CONNECTOR)
270 && let Some(queue_size_value) = props.get("debezium.max.queue.size")
271 && queue_size_value.parse::<u32>().is_err()
272 {
273 return Err(ErrorCode::InvalidConfigValue {
274 config_entry: "debezium.max.queue.size".to_owned(),
275 config_value: queue_size_value.to_owned(),
276 }
277 .into());
278 }
279
280 Ok(())
281}