risingwave_frontend/handler/create_source/
validate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR;
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20    LazyLock::new(|| {
21        hashset! {
22            PbConnectionType::Unspecified,
23            PbConnectionType::Kafka,
24            PbConnectionType::Iceberg,
25        }
26    });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29    LazyLock::new(|| {
30        hashset! {
31            PbConnectionType::Unspecified,
32            PbConnectionType::SchemaRegistry,
33        }
34    });
35
36// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
37static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38    LazyLock::new(|| {
39        convert_args!(hashmap!(
40                KAFKA_CONNECTOR => hashmap!(
41                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43                    Format::Debezium => vec![Encode::Json, Encode::Avro],
44                    Format::Maxwell => vec![Encode::Json],
45                    Format::Canal => vec![Encode::Json],
46                    Format::DebeziumMongo => vec![Encode::Json],
47                ),
48                PULSAR_CONNECTOR => hashmap!(
49                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
50                    Format::Upsert => vec![Encode::Json, Encode::Avro],
51                    Format::Debezium => vec![Encode::Json],
52                    Format::Maxwell => vec![Encode::Json],
53                    Format::Canal => vec![Encode::Json],
54                ),
55                KINESIS_CONNECTOR => hashmap!(
56                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
57                    Format::Upsert => vec![Encode::Json, Encode::Avro],
58                    Format::Debezium => vec![Encode::Json],
59                    Format::Maxwell => vec![Encode::Json],
60                    Format::Canal => vec![Encode::Json],
61                ),
62                GOOGLE_PUBSUB_CONNECTOR => hashmap!(
63                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
64                    Format::Debezium => vec![Encode::Json],
65                    Format::Maxwell => vec![Encode::Json],
66                    Format::Canal => vec![Encode::Json],
67                ),
68                NEXMARK_CONNECTOR => hashmap!(
69                    Format::Native => vec![Encode::Native],
70                    Format::Plain => vec![Encode::Bytes],
71                ),
72                DATAGEN_CONNECTOR => hashmap!(
73                    Format::Native => vec![Encode::Native],
74                    Format::Plain => vec![Encode::Bytes, Encode::Json],
75                ),
76                OPENDAL_S3_CONNECTOR => hashmap!(
77                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
78                ),
79                GCS_CONNECTOR => hashmap!(
80                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
81                ),
82                AZBLOB_CONNECTOR => hashmap!(
83                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
84                ),
85                POSIX_FS_CONNECTOR => hashmap!(
86                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
87                ),
88                BATCH_POSIX_FS_CONNECTOR => hashmap!(
89                    Format::Plain => vec![Encode::Csv],
90                ),
91                MYSQL_CDC_CONNECTOR => hashmap!(
92                    Format::Debezium => vec![Encode::Json],
93                    // support source stream job
94                    Format::Plain => vec![Encode::Json],
95                ),
96                POSTGRES_CDC_CONNECTOR => hashmap!(
97                    Format::Debezium => vec![Encode::Json],
98                    // support source stream job
99                    Format::Plain => vec![Encode::Json],
100                ),
101                CITUS_CDC_CONNECTOR => hashmap!(
102                    Format::Debezium => vec![Encode::Json],
103                ),
104                MONGODB_CDC_CONNECTOR => hashmap!(
105                    Format::DebeziumMongo => vec![Encode::Json],
106                ),
107                NATS_CONNECTOR => hashmap!(
108                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
109                ),
110                MQTT_CONNECTOR => hashmap!(
111                    Format::Plain => vec![Encode::Json, Encode::Bytes],
112                ),
113                TEST_CONNECTOR => hashmap!(
114                    Format::Plain => vec![Encode::Json],
115                ),
116                ICEBERG_CONNECTOR => hashmap!(
117                    Format::None => vec![Encode::None],
118                ),
119                SQL_SERVER_CDC_CONNECTOR => hashmap!(
120                    Format::Debezium => vec![Encode::Json],
121                    // support source stream job
122                    Format::Plain => vec![Encode::Json],
123                ),
124        ))
125    });
126
127fn validate_license(connector: &str) -> Result<()> {
128    if connector == SQL_SERVER_CDC_CONNECTOR {
129        Feature::SqlServerCdcSource.check_available()?;
130    }
131    Ok(())
132}
133
134pub fn validate_compatibility(
135    format_encode: &FormatEncodeOptions,
136    props: &mut BTreeMap<String, String>,
137) -> Result<()> {
138    let mut connector = props
139        .get_connector()
140        .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
141
142    if connector == OPENDAL_S3_CONNECTOR {
143        // reject s3_v2 creation
144        return Err(RwError::from(Deprecated(
145            OPENDAL_S3_CONNECTOR.to_owned(),
146            LEGACY_S3_CONNECTOR.to_owned(),
147        )));
148    }
149    if connector == LEGACY_S3_CONNECTOR {
150        // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
151        // do s3 -> s3_v2 migration
152        let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
153        *entry = OPENDAL_S3_CONNECTOR.to_owned();
154        connector = OPENDAL_S3_CONNECTOR.to_owned();
155    }
156
157    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
158        .get(&connector)
159        .ok_or_else(|| {
160            RwError::from(ProtocolError(format!(
161                "connector {:?} is not supported, accept {:?}",
162                connector,
163                CONNECTORS_COMPATIBLE_FORMATS.keys()
164            )))
165        })?;
166
167    validate_license(&connector)?;
168    if connector != KAFKA_CONNECTOR {
169        let res = match (&format_encode.format, &format_encode.row_encode) {
170            (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
171                let mut options = WithOptions::try_from(format_encode.row_options())?;
172                let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
173                use_schema_registry
174            }
175            (Format::Debezium, Encode::Avro) => true,
176            (_, _) => false,
177        };
178        if res {
179            return Err(RwError::from(ProtocolError(format!(
180                "The {} must be kafka when schema registry is used",
181                UPSTREAM_SOURCE_KEY
182            ))));
183        }
184    }
185
186    let compatible_encodes = compatible_formats
187        .get(&format_encode.format)
188        .ok_or_else(|| {
189            RwError::from(ProtocolError(format!(
190                "connector {} does not support format {:?}",
191                connector, format_encode.format
192            )))
193        })?;
194    if !compatible_encodes.contains(&format_encode.row_encode) {
195        return Err(RwError::from(ProtocolError(format!(
196            "connector {} does not support format {:?} with encode {:?}",
197            connector, format_encode.format, format_encode.row_encode
198        ))));
199    }
200
201    if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
202        match props.get("slot.name") {
203            None => {
204                // Build a random slot name with UUID
205                // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
206                let uuid = uuid::Uuid::new_v4();
207                props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
208            }
209            Some(slot_name) => {
210                // please refer to
211                // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179
212                // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3
213                if !slot_name
214                    .chars()
215                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
216                    || slot_name.len() > 63
217                {
218                    return Err(RwError::from(ProtocolError(format!(
219                        "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
220                        slot_name
221                    ))));
222                }
223            }
224        }
225
226        if !props.contains_key("schema.name") {
227            // Default schema name is "public"
228            props.insert("schema.name".into(), "public".into());
229        }
230        if !props.contains_key("publication.name") {
231            // Default publication name is "rw_publication"
232            props.insert("publication.name".into(), "rw_publication".into());
233        }
234        if !props.contains_key("publication.create.enable") {
235            // Default auto create publication if doesn't exist
236            props.insert("publication.create.enable".into(), "true".into());
237        }
238    }
239
240    if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
241        // Default schema name is "dbo"
242        props.insert("schema.name".into(), "dbo".into());
243    }
244
245    Ok(())
246}