risingwave_frontend/handler/create_source/
validate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
18    LazyLock::new(|| {
19        hashset! {
20            PbConnectionType::Unspecified,
21            PbConnectionType::Kafka,
22            PbConnectionType::Iceberg,
23        }
24    });
25
26pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
27    LazyLock::new(|| {
28        hashset! {
29            PbConnectionType::Unspecified,
30            PbConnectionType::SchemaRegistry,
31        }
32    });
33
34// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
35static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
36    LazyLock::new(|| {
37        convert_args!(hashmap!(
38                KAFKA_CONNECTOR => hashmap!(
39                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
40                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
41                    Format::Debezium => vec![Encode::Json, Encode::Avro],
42                    Format::Maxwell => vec![Encode::Json],
43                    Format::Canal => vec![Encode::Json],
44                    Format::DebeziumMongo => vec![Encode::Json],
45                ),
46                PULSAR_CONNECTOR => hashmap!(
47                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
48                    Format::Upsert => vec![Encode::Json, Encode::Avro],
49                    Format::Debezium => vec![Encode::Json],
50                    Format::Maxwell => vec![Encode::Json],
51                    Format::Canal => vec![Encode::Json],
52                ),
53                KINESIS_CONNECTOR => hashmap!(
54                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
55                    Format::Upsert => vec![Encode::Json, Encode::Avro],
56                    Format::Debezium => vec![Encode::Json],
57                    Format::Maxwell => vec![Encode::Json],
58                    Format::Canal => vec![Encode::Json],
59                ),
60                GOOGLE_PUBSUB_CONNECTOR => hashmap!(
61                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
62                    Format::Debezium => vec![Encode::Json],
63                    Format::Maxwell => vec![Encode::Json],
64                    Format::Canal => vec![Encode::Json],
65                ),
66                NEXMARK_CONNECTOR => hashmap!(
67                    Format::Native => vec![Encode::Native],
68                    Format::Plain => vec![Encode::Bytes],
69                ),
70                DATAGEN_CONNECTOR => hashmap!(
71                    Format::Native => vec![Encode::Native],
72                    Format::Plain => vec![Encode::Bytes, Encode::Json],
73                ),
74                OPENDAL_S3_CONNECTOR => hashmap!(
75                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
76                ),
77                GCS_CONNECTOR => hashmap!(
78                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
79                ),
80                AZBLOB_CONNECTOR => hashmap!(
81                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
82                ),
83                POSIX_FS_CONNECTOR => hashmap!(
84                    Format::Plain => vec![Encode::Csv],
85                ),
86                MYSQL_CDC_CONNECTOR => hashmap!(
87                    Format::Debezium => vec![Encode::Json],
88                    // support source stream job
89                    Format::Plain => vec![Encode::Json],
90                ),
91                POSTGRES_CDC_CONNECTOR => hashmap!(
92                    Format::Debezium => vec![Encode::Json],
93                    // support source stream job
94                    Format::Plain => vec![Encode::Json],
95                ),
96                CITUS_CDC_CONNECTOR => hashmap!(
97                    Format::Debezium => vec![Encode::Json],
98                ),
99                MONGODB_CDC_CONNECTOR => hashmap!(
100                    Format::DebeziumMongo => vec![Encode::Json],
101                ),
102                NATS_CONNECTOR => hashmap!(
103                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
104                ),
105                MQTT_CONNECTOR => hashmap!(
106                    Format::Plain => vec![Encode::Json, Encode::Bytes],
107                ),
108                TEST_CONNECTOR => hashmap!(
109                    Format::Plain => vec![Encode::Json],
110                ),
111                ICEBERG_CONNECTOR => hashmap!(
112                    Format::None => vec![Encode::None],
113                ),
114                SQL_SERVER_CDC_CONNECTOR => hashmap!(
115                    Format::Debezium => vec![Encode::Json],
116                    // support source stream job
117                    Format::Plain => vec![Encode::Json],
118                ),
119        ))
120    });
121
122fn validate_license(connector: &str) -> Result<()> {
123    if connector == SQL_SERVER_CDC_CONNECTOR {
124        Feature::SqlServerCdcSource.check_available()?;
125    }
126    Ok(())
127}
128
129pub fn validate_compatibility(
130    format_encode: &FormatEncodeOptions,
131    props: &mut BTreeMap<String, String>,
132) -> Result<()> {
133    let mut connector = props
134        .get_connector()
135        .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
136
137    if connector == OPENDAL_S3_CONNECTOR {
138        // reject s3_v2 creation
139        return Err(RwError::from(Deprecated(
140            OPENDAL_S3_CONNECTOR.to_owned(),
141            LEGACY_S3_CONNECTOR.to_owned(),
142        )));
143    }
144    if connector == LEGACY_S3_CONNECTOR {
145        // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
146        // do s3 -> s3_v2 migration
147        let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
148        *entry = OPENDAL_S3_CONNECTOR.to_owned();
149        connector = OPENDAL_S3_CONNECTOR.to_owned();
150    }
151
152    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
153        .get(&connector)
154        .ok_or_else(|| {
155            RwError::from(ProtocolError(format!(
156                "connector {:?} is not supported, accept {:?}",
157                connector,
158                CONNECTORS_COMPATIBLE_FORMATS.keys()
159            )))
160        })?;
161
162    validate_license(&connector)?;
163    if connector != KAFKA_CONNECTOR {
164        let res = match (&format_encode.format, &format_encode.row_encode) {
165            (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
166                let mut options = WithOptions::try_from(format_encode.row_options())?;
167                let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
168                use_schema_registry
169            }
170            (Format::Debezium, Encode::Avro) => true,
171            (_, _) => false,
172        };
173        if res {
174            return Err(RwError::from(ProtocolError(format!(
175                "The {} must be kafka when schema registry is used",
176                UPSTREAM_SOURCE_KEY
177            ))));
178        }
179    }
180
181    let compatible_encodes = compatible_formats
182        .get(&format_encode.format)
183        .ok_or_else(|| {
184            RwError::from(ProtocolError(format!(
185                "connector {} does not support format {:?}",
186                connector, format_encode.format
187            )))
188        })?;
189    if !compatible_encodes.contains(&format_encode.row_encode) {
190        return Err(RwError::from(ProtocolError(format!(
191            "connector {} does not support format {:?} with encode {:?}",
192            connector, format_encode.format, format_encode.row_encode
193        ))));
194    }
195
196    if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
197        match props.get("slot.name") {
198            None => {
199                // Build a random slot name with UUID
200                // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
201                let uuid = uuid::Uuid::new_v4();
202                props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
203            }
204            Some(slot_name) => {
205                // please refer to
206                // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179
207                // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3
208                if !slot_name
209                    .chars()
210                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
211                    || slot_name.len() > 63
212                {
213                    return Err(RwError::from(ProtocolError(format!(
214                        "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
215                        slot_name
216                    ))));
217                }
218            }
219        }
220
221        if !props.contains_key("schema.name") {
222            // Default schema name is "public"
223            props.insert("schema.name".into(), "public".into());
224        }
225        if !props.contains_key("publication.name") {
226            // Default publication name is "rw_publication"
227            props.insert("publication.name".into(), "rw_publication".into());
228        }
229        if !props.contains_key("publication.create.enable") {
230            // Default auto create publication if doesn't exist
231            props.insert("publication.create.enable".into(), "true".into());
232        }
233    }
234
235    if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
236        // Default schema name is "dbo"
237        props.insert("schema.name".into(), "dbo".into());
238    }
239
240    Ok(())
241}