risingwave_frontend/handler/create_source/
validate.rs1use super::*;
16
17pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
18 LazyLock::new(|| {
19 hashset! {
20 PbConnectionType::Unspecified,
21 PbConnectionType::Kafka,
22 PbConnectionType::Iceberg,
23 }
24 });
25
26pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
27 LazyLock::new(|| {
28 hashset! {
29 PbConnectionType::Unspecified,
30 PbConnectionType::SchemaRegistry,
31 }
32 });
33
34static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
36 LazyLock::new(|| {
37 convert_args!(hashmap!(
38 KAFKA_CONNECTOR => hashmap!(
39 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
40 Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
41 Format::Debezium => vec![Encode::Json, Encode::Avro],
42 Format::Maxwell => vec![Encode::Json],
43 Format::Canal => vec![Encode::Json],
44 Format::DebeziumMongo => vec![Encode::Json],
45 ),
46 PULSAR_CONNECTOR => hashmap!(
47 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
48 Format::Upsert => vec![Encode::Json, Encode::Avro],
49 Format::Debezium => vec![Encode::Json],
50 Format::Maxwell => vec![Encode::Json],
51 Format::Canal => vec![Encode::Json],
52 ),
53 KINESIS_CONNECTOR => hashmap!(
54 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
55 Format::Upsert => vec![Encode::Json, Encode::Avro],
56 Format::Debezium => vec![Encode::Json],
57 Format::Maxwell => vec![Encode::Json],
58 Format::Canal => vec![Encode::Json],
59 ),
60 GOOGLE_PUBSUB_CONNECTOR => hashmap!(
61 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
62 Format::Debezium => vec![Encode::Json],
63 Format::Maxwell => vec![Encode::Json],
64 Format::Canal => vec![Encode::Json],
65 ),
66 NEXMARK_CONNECTOR => hashmap!(
67 Format::Native => vec![Encode::Native],
68 Format::Plain => vec![Encode::Bytes],
69 ),
70 DATAGEN_CONNECTOR => hashmap!(
71 Format::Native => vec![Encode::Native],
72 Format::Plain => vec![Encode::Bytes, Encode::Json],
73 ),
74 OPENDAL_S3_CONNECTOR => hashmap!(
75 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
76 ),
77 GCS_CONNECTOR => hashmap!(
78 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
79 ),
80 AZBLOB_CONNECTOR => hashmap!(
81 Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
82 ),
83 POSIX_FS_CONNECTOR => hashmap!(
84 Format::Plain => vec![Encode::Csv],
85 ),
86 MYSQL_CDC_CONNECTOR => hashmap!(
87 Format::Debezium => vec![Encode::Json],
88 Format::Plain => vec![Encode::Json],
90 ),
91 POSTGRES_CDC_CONNECTOR => hashmap!(
92 Format::Debezium => vec![Encode::Json],
93 Format::Plain => vec![Encode::Json],
95 ),
96 CITUS_CDC_CONNECTOR => hashmap!(
97 Format::Debezium => vec![Encode::Json],
98 ),
99 MONGODB_CDC_CONNECTOR => hashmap!(
100 Format::DebeziumMongo => vec![Encode::Json],
101 ),
102 NATS_CONNECTOR => hashmap!(
103 Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
104 ),
105 MQTT_CONNECTOR => hashmap!(
106 Format::Plain => vec![Encode::Json, Encode::Bytes],
107 ),
108 TEST_CONNECTOR => hashmap!(
109 Format::Plain => vec![Encode::Json],
110 ),
111 ICEBERG_CONNECTOR => hashmap!(
112 Format::None => vec![Encode::None],
113 ),
114 SQL_SERVER_CDC_CONNECTOR => hashmap!(
115 Format::Debezium => vec![Encode::Json],
116 Format::Plain => vec![Encode::Json],
118 ),
119 ))
120 });
121
122fn validate_license(connector: &str) -> Result<()> {
123 if connector == SQL_SERVER_CDC_CONNECTOR {
124 Feature::SqlServerCdcSource
125 .check_available()
126 .map_err(|e| anyhow::anyhow!(e))?;
127 }
128 Ok(())
129}
130
131pub fn validate_compatibility(
132 format_encode: &FormatEncodeOptions,
133 props: &mut BTreeMap<String, String>,
134) -> Result<()> {
135 let mut connector = props
136 .get_connector()
137 .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
138
139 if connector == OPENDAL_S3_CONNECTOR {
140 return Err(RwError::from(Deprecated(
142 OPENDAL_S3_CONNECTOR.to_owned(),
143 LEGACY_S3_CONNECTOR.to_owned(),
144 )));
145 }
146 if connector == LEGACY_S3_CONNECTOR {
147 let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
150 *entry = OPENDAL_S3_CONNECTOR.to_owned();
151 connector = OPENDAL_S3_CONNECTOR.to_owned();
152 }
153
154 let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
155 .get(&connector)
156 .ok_or_else(|| {
157 RwError::from(ProtocolError(format!(
158 "connector {:?} is not supported, accept {:?}",
159 connector,
160 CONNECTORS_COMPATIBLE_FORMATS.keys()
161 )))
162 })?;
163
164 validate_license(&connector)?;
165 if connector != KAFKA_CONNECTOR {
166 let res = match (&format_encode.format, &format_encode.row_encode) {
167 (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
168 let mut options = WithOptions::try_from(format_encode.row_options())?;
169 let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
170 use_schema_registry
171 }
172 (Format::Debezium, Encode::Avro) => true,
173 (_, _) => false,
174 };
175 if res {
176 return Err(RwError::from(ProtocolError(format!(
177 "The {} must be kafka when schema registry is used",
178 UPSTREAM_SOURCE_KEY
179 ))));
180 }
181 }
182
183 let compatible_encodes = compatible_formats
184 .get(&format_encode.format)
185 .ok_or_else(|| {
186 RwError::from(ProtocolError(format!(
187 "connector {} does not support format {:?}",
188 connector, format_encode.format
189 )))
190 })?;
191 if !compatible_encodes.contains(&format_encode.row_encode) {
192 return Err(RwError::from(ProtocolError(format!(
193 "connector {} does not support format {:?} with encode {:?}",
194 connector, format_encode.format, format_encode.row_encode
195 ))));
196 }
197
198 if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
199 match props.get("slot.name") {
200 None => {
201 let uuid = uuid::Uuid::new_v4();
204 props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
205 }
206 Some(slot_name) => {
207 if !slot_name
211 .chars()
212 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
213 || slot_name.len() > 63
214 {
215 return Err(RwError::from(ProtocolError(format!(
216 "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
217 slot_name
218 ))));
219 }
220 }
221 }
222
223 if !props.contains_key("schema.name") {
224 props.insert("schema.name".into(), "public".into());
226 }
227 if !props.contains_key("publication.name") {
228 props.insert("publication.name".into(), "rw_publication".into());
230 }
231 if !props.contains_key("publication.create.enable") {
232 props.insert("publication.create.enable".into(), "true".into());
234 }
235 }
236
237 if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
238 props.insert("schema.name".into(), "dbo".into());
240 }
241
242 Ok(())
243}