risingwave_frontend/handler/create_source/
validate.rs1use risingwave_connector::source::{ADBC_SNOWFLAKE_CONNECTOR, BATCH_POSIX_FS_CONNECTOR};
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20 LazyLock::new(|| {
21 hashset! {
22 PbConnectionType::Unspecified,
23 PbConnectionType::Kafka,
24 PbConnectionType::Iceberg,
25 }
26 });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29 LazyLock::new(|| {
30 hashset! {
31 PbConnectionType::Unspecified,
32 PbConnectionType::SchemaRegistry,
33 }
34 });
35
36static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38 LazyLock::new(|| {
39 convert_args!(hashmap!(
40 KAFKA_CONNECTOR => hashmap!(
41 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43 Format::Debezium => vec![Encode::Json, Encode::Avro],
44 Format::DebeziumMongo => vec![Encode::Json],
45 ),
46 PULSAR_CONNECTOR => hashmap!(
47 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
48 Format::Upsert => vec![Encode::Json, Encode::Avro],
49 Format::Debezium => vec![Encode::Json],
50 ),
51 KINESIS_CONNECTOR => hashmap!(
52 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
53 Format::Upsert => vec![Encode::Json, Encode::Avro],
54 Format::Debezium => vec![Encode::Json],
55 ),
56 GOOGLE_PUBSUB_CONNECTOR => hashmap!(
57 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
58 Format::Debezium => vec![Encode::Json],
59 ),
60 NEXMARK_CONNECTOR => hashmap!(
61 Format::Native => vec![Encode::Native],
62 Format::Plain => vec![Encode::Bytes],
63 ),
64 DATAGEN_CONNECTOR => hashmap!(
65 Format::Native => vec![Encode::Native],
66 Format::Plain => vec![Encode::Bytes, Encode::Json],
67 ),
68 OPENDAL_S3_CONNECTOR => hashmap!(
69 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
70 ),
71 GCS_CONNECTOR => hashmap!(
72 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
73 ),
74 AZBLOB_CONNECTOR => hashmap!(
75 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
76 ),
77 POSIX_FS_CONNECTOR => hashmap!(
78 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
79 ),
80 BATCH_POSIX_FS_CONNECTOR => hashmap!(
81 Format::Plain => vec![Encode::Csv],
82 ),
83 MYSQL_CDC_CONNECTOR => hashmap!(
84 Format::Debezium => vec![Encode::Json],
85 Format::Plain => vec![Encode::Json],
87 ),
88 POSTGRES_CDC_CONNECTOR => hashmap!(
89 Format::Debezium => vec![Encode::Json],
90 Format::Plain => vec![Encode::Json],
92 ),
93 MONGODB_CDC_CONNECTOR => hashmap!(
94 Format::DebeziumMongo => vec![Encode::Json],
95 ),
96 NATS_CONNECTOR => hashmap!(
97 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
98 ),
99 MQTT_CONNECTOR => hashmap!(
100 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
101 ),
102 TEST_CONNECTOR => hashmap!(
103 Format::Plain => vec![Encode::Json],
104 ),
105 ICEBERG_CONNECTOR => hashmap!(
106 Format::None => vec![Encode::None],
107 ),
108 ADBC_SNOWFLAKE_CONNECTOR => hashmap!(
109 Format::None => vec![Encode::None],
110 ),
111 SQL_SERVER_CDC_CONNECTOR => hashmap!(
112 Format::Debezium => vec![Encode::Json],
113 Format::Plain => vec![Encode::Json],
115 ),
116 ))
117 });
118
119fn validate_license(connector: &str) -> Result<()> {
120 if connector == SQL_SERVER_CDC_CONNECTOR {
121 Feature::SqlServerCdcSource.check_available()?;
122 }
123 Ok(())
124}
125
126pub fn validate_compatibility(
127 format_encode: &FormatEncodeOptions,
128 props: &mut BTreeMap<String, String>,
129) -> Result<()> {
130 let mut connector = props
131 .get_connector()
132 .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
133
134 if connector == OPENDAL_S3_CONNECTOR {
135 return Err(RwError::from(Deprecated(
137 OPENDAL_S3_CONNECTOR.to_owned(),
138 LEGACY_S3_CONNECTOR.to_owned(),
139 )));
140 }
141 if connector == LEGACY_S3_CONNECTOR {
142 let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
145 *entry = OPENDAL_S3_CONNECTOR.to_owned();
146 connector = OPENDAL_S3_CONNECTOR.to_owned();
147 }
148
149 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
150 .get(&connector)
151 .ok_or_else(|| {
152 RwError::from(ProtocolError(format!(
153 "connector {:?} is not supported, accept {:?}",
154 connector,
155 CONNECTORS_COMPATIBLE_FORMATS.keys()
156 )))
157 })?;
158
159 validate_license(&connector)?;
160 if connector != KAFKA_CONNECTOR {
161 let res = match (&format_encode.format, &format_encode.row_encode) {
162 (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
163 let mut options = WithOptions::try_from(format_encode.row_options())?;
164 let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
165 use_schema_registry
166 }
167 (Format::Debezium, Encode::Avro) => true,
168 (_, _) => false,
169 };
170 if res {
171 return Err(RwError::from(ProtocolError(format!(
172 "The {} must be kafka when schema registry is used",
173 UPSTREAM_SOURCE_KEY
174 ))));
175 }
176 }
177
178 let compatible_encodes = compatible_formats
179 .get(&format_encode.format)
180 .ok_or_else(|| {
181 RwError::from(ProtocolError(format!(
182 "connector {} does not support format {:?}",
183 connector, format_encode.format
184 )))
185 })?;
186 if !compatible_encodes.contains(&format_encode.row_encode) {
187 return Err(RwError::from(ProtocolError(format!(
188 "connector {} does not support format {:?} with encode {:?}",
189 connector, format_encode.format, format_encode.row_encode
190 ))));
191 }
192
193 if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
194 match props.get("slot.name") {
195 None => {
196 let uuid = uuid::Uuid::new_v4();
199 props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
200 }
201 Some(slot_name) => {
202 if !slot_name
206 .chars()
207 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
208 || slot_name.len() > 63
209 {
210 return Err(RwError::from(ProtocolError(format!(
211 "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
212 slot_name
213 ))));
214 }
215 }
216 }
217
218 if !props.contains_key("schema.name") {
219 props.insert("schema.name".into(), "public".into());
221 }
222 if !props.contains_key("publication.name") {
223 let uuid = uuid::Uuid::new_v4();
226 props.insert(
227 "publication.name".into(),
228 format!("rw_publication_{}", uuid.simple()),
229 );
230 }
231 if !props.contains_key("publication.create.enable") {
232 props.insert("publication.create.enable".into(), "true".into());
234 }
235 }
236
237 if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
238 props.insert("schema.name".into(), "dbo".into());
240 }
241
242 if (connector == MYSQL_CDC_CONNECTOR
244 || connector == POSTGRES_CDC_CONNECTOR
245 || connector == CITUS_CDC_CONNECTOR
246 || connector == MONGODB_CDC_CONNECTOR
247 || connector == SQL_SERVER_CDC_CONNECTOR)
248 && let Some(timeout_value) = props.get("cdc.source.wait.streaming.start.timeout")
249 && timeout_value.parse::<u32>().is_err()
250 {
251 return Err(ErrorCode::InvalidConfigValue {
252 config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
253 config_value: timeout_value.to_owned(),
254 }
255 .into());
256 }
257
258 if (connector == MYSQL_CDC_CONNECTOR
260 || connector == POSTGRES_CDC_CONNECTOR
261 || connector == CITUS_CDC_CONNECTOR
262 || connector == MONGODB_CDC_CONNECTOR
263 || connector == SQL_SERVER_CDC_CONNECTOR)
264 && let Some(queue_size_value) = props.get("debezium.max.queue.size")
265 && queue_size_value.parse::<u32>().is_err()
266 {
267 return Err(ErrorCode::InvalidConfigValue {
268 config_entry: "debezium.max.queue.size".to_owned(),
269 config_value: queue_size_value.to_owned(),
270 }
271 .into());
272 }
273
274 Ok(())
275}