risingwave_frontend/handler/create_source/
validate.rs1use risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR;
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20 LazyLock::new(|| {
21 hashset! {
22 PbConnectionType::Unspecified,
23 PbConnectionType::Kafka,
24 PbConnectionType::Iceberg,
25 }
26 });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29 LazyLock::new(|| {
30 hashset! {
31 PbConnectionType::Unspecified,
32 PbConnectionType::SchemaRegistry,
33 }
34 });
35
36static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38 LazyLock::new(|| {
39 convert_args!(hashmap!(
40 KAFKA_CONNECTOR => hashmap!(
41 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43 Format::Debezium => vec![Encode::Json, Encode::Avro],
44 Format::Maxwell => vec![Encode::Json],
45 Format::Canal => vec![Encode::Json],
46 Format::DebeziumMongo => vec![Encode::Json],
47 ),
48 PULSAR_CONNECTOR => hashmap!(
49 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
50 Format::Upsert => vec![Encode::Json, Encode::Avro],
51 Format::Debezium => vec![Encode::Json],
52 Format::Maxwell => vec![Encode::Json],
53 Format::Canal => vec![Encode::Json],
54 ),
55 KINESIS_CONNECTOR => hashmap!(
56 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
57 Format::Upsert => vec![Encode::Json, Encode::Avro],
58 Format::Debezium => vec![Encode::Json],
59 Format::Maxwell => vec![Encode::Json],
60 Format::Canal => vec![Encode::Json],
61 ),
62 GOOGLE_PUBSUB_CONNECTOR => hashmap!(
63 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
64 Format::Debezium => vec![Encode::Json],
65 Format::Maxwell => vec![Encode::Json],
66 Format::Canal => vec![Encode::Json],
67 ),
68 NEXMARK_CONNECTOR => hashmap!(
69 Format::Native => vec![Encode::Native],
70 Format::Plain => vec![Encode::Bytes],
71 ),
72 DATAGEN_CONNECTOR => hashmap!(
73 Format::Native => vec![Encode::Native],
74 Format::Plain => vec![Encode::Bytes, Encode::Json],
75 ),
76 OPENDAL_S3_CONNECTOR => hashmap!(
77 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
78 ),
79 GCS_CONNECTOR => hashmap!(
80 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
81 ),
82 AZBLOB_CONNECTOR => hashmap!(
83 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
84 ),
85 POSIX_FS_CONNECTOR => hashmap!(
86 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
87 ),
88 BATCH_POSIX_FS_CONNECTOR => hashmap!(
89 Format::Plain => vec![Encode::Csv],
90 ),
91 MYSQL_CDC_CONNECTOR => hashmap!(
92 Format::Debezium => vec![Encode::Json],
93 Format::Plain => vec![Encode::Json],
95 ),
96 POSTGRES_CDC_CONNECTOR => hashmap!(
97 Format::Debezium => vec![Encode::Json],
98 Format::Plain => vec![Encode::Json],
100 ),
101 CITUS_CDC_CONNECTOR => hashmap!(
102 Format::Debezium => vec![Encode::Json],
103 ),
104 MONGODB_CDC_CONNECTOR => hashmap!(
105 Format::DebeziumMongo => vec![Encode::Json],
106 ),
107 NATS_CONNECTOR => hashmap!(
108 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
109 ),
110 MQTT_CONNECTOR => hashmap!(
111 Format::Plain => vec![Encode::Json, Encode::Bytes],
112 ),
113 TEST_CONNECTOR => hashmap!(
114 Format::Plain => vec![Encode::Json],
115 ),
116 ICEBERG_CONNECTOR => hashmap!(
117 Format::None => vec![Encode::None],
118 ),
119 SQL_SERVER_CDC_CONNECTOR => hashmap!(
120 Format::Debezium => vec![Encode::Json],
121 Format::Plain => vec![Encode::Json],
123 ),
124 ))
125 });
126
127fn validate_license(connector: &str) -> Result<()> {
128 if connector == SQL_SERVER_CDC_CONNECTOR {
129 Feature::SqlServerCdcSource.check_available()?;
130 }
131 Ok(())
132}
133
134pub fn validate_compatibility(
135 format_encode: &FormatEncodeOptions,
136 props: &mut BTreeMap<String, String>,
137) -> Result<()> {
138 let mut connector = props
139 .get_connector()
140 .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
141
142 if connector == OPENDAL_S3_CONNECTOR {
143 return Err(RwError::from(Deprecated(
145 OPENDAL_S3_CONNECTOR.to_owned(),
146 LEGACY_S3_CONNECTOR.to_owned(),
147 )));
148 }
149 if connector == LEGACY_S3_CONNECTOR {
150 let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
153 *entry = OPENDAL_S3_CONNECTOR.to_owned();
154 connector = OPENDAL_S3_CONNECTOR.to_owned();
155 }
156
157 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
158 .get(&connector)
159 .ok_or_else(|| {
160 RwError::from(ProtocolError(format!(
161 "connector {:?} is not supported, accept {:?}",
162 connector,
163 CONNECTORS_COMPATIBLE_FORMATS.keys()
164 )))
165 })?;
166
167 validate_license(&connector)?;
168 if connector != KAFKA_CONNECTOR {
169 let res = match (&format_encode.format, &format_encode.row_encode) {
170 (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
171 let mut options = WithOptions::try_from(format_encode.row_options())?;
172 let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
173 use_schema_registry
174 }
175 (Format::Debezium, Encode::Avro) => true,
176 (_, _) => false,
177 };
178 if res {
179 return Err(RwError::from(ProtocolError(format!(
180 "The {} must be kafka when schema registry is used",
181 UPSTREAM_SOURCE_KEY
182 ))));
183 }
184 }
185
186 let compatible_encodes = compatible_formats
187 .get(&format_encode.format)
188 .ok_or_else(|| {
189 RwError::from(ProtocolError(format!(
190 "connector {} does not support format {:?}",
191 connector, format_encode.format
192 )))
193 })?;
194 if !compatible_encodes.contains(&format_encode.row_encode) {
195 return Err(RwError::from(ProtocolError(format!(
196 "connector {} does not support format {:?} with encode {:?}",
197 connector, format_encode.format, format_encode.row_encode
198 ))));
199 }
200
201 if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
202 match props.get("slot.name") {
203 None => {
204 let uuid = uuid::Uuid::new_v4();
207 props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
208 }
209 Some(slot_name) => {
210 if !slot_name
214 .chars()
215 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
216 || slot_name.len() > 63
217 {
218 return Err(RwError::from(ProtocolError(format!(
219 "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
220 slot_name
221 ))));
222 }
223 }
224 }
225
226 if !props.contains_key("schema.name") {
227 props.insert("schema.name".into(), "public".into());
229 }
230 if !props.contains_key("publication.name") {
231 props.insert("publication.name".into(), "rw_publication".into());
233 }
234 if !props.contains_key("publication.create.enable") {
235 props.insert("publication.create.enable".into(), "true".into());
237 }
238 }
239
240 if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
241 props.insert("schema.name".into(), "dbo".into());
243 }
244
245 Ok(())
246}