risingwave_frontend/handler/create_source/
validate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::source::{ADBC_SNOWFLAKE_CONNECTOR, BATCH_POSIX_FS_CONNECTOR};
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20    LazyLock::new(|| {
21        hashset! {
22            PbConnectionType::Unspecified,
23            PbConnectionType::Kafka,
24            PbConnectionType::Iceberg,
25        }
26    });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29    LazyLock::new(|| {
30        hashset! {
31            PbConnectionType::Unspecified,
32            PbConnectionType::SchemaRegistry,
33        }
34    });
35
36// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
37static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38    LazyLock::new(|| {
39        convert_args!(hashmap!(
40                KAFKA_CONNECTOR => hashmap!(
41                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43                    Format::Debezium => vec![Encode::Json, Encode::Avro],
44                    Format::Maxwell => vec![Encode::Json],
45                    Format::Canal => vec![Encode::Json],
46                    Format::DebeziumMongo => vec![Encode::Json],
47                ),
48                PULSAR_CONNECTOR => hashmap!(
49                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
50                    Format::Upsert => vec![Encode::Json, Encode::Avro],
51                    Format::Debezium => vec![Encode::Json],
52                    Format::Maxwell => vec![Encode::Json],
53                    Format::Canal => vec![Encode::Json],
54                ),
55                KINESIS_CONNECTOR => hashmap!(
56                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
57                    Format::Upsert => vec![Encode::Json, Encode::Avro],
58                    Format::Debezium => vec![Encode::Json],
59                    Format::Maxwell => vec![Encode::Json],
60                    Format::Canal => vec![Encode::Json],
61                ),
62                GOOGLE_PUBSUB_CONNECTOR => hashmap!(
63                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
64                    Format::Debezium => vec![Encode::Json],
65                    Format::Maxwell => vec![Encode::Json],
66                    Format::Canal => vec![Encode::Json],
67                ),
68                NEXMARK_CONNECTOR => hashmap!(
69                    Format::Native => vec![Encode::Native],
70                    Format::Plain => vec![Encode::Bytes],
71                ),
72                DATAGEN_CONNECTOR => hashmap!(
73                    Format::Native => vec![Encode::Native],
74                    Format::Plain => vec![Encode::Bytes, Encode::Json],
75                ),
76                OPENDAL_S3_CONNECTOR => hashmap!(
77                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
78                ),
79                GCS_CONNECTOR => hashmap!(
80                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
81                ),
82                AZBLOB_CONNECTOR => hashmap!(
83                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
84                ),
85                POSIX_FS_CONNECTOR => hashmap!(
86                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
87                ),
88                BATCH_POSIX_FS_CONNECTOR => hashmap!(
89                    Format::Plain => vec![Encode::Csv],
90                ),
91                MYSQL_CDC_CONNECTOR => hashmap!(
92                    Format::Debezium => vec![Encode::Json],
93                    // support source stream job
94                    Format::Plain => vec![Encode::Json],
95                ),
96                POSTGRES_CDC_CONNECTOR => hashmap!(
97                    Format::Debezium => vec![Encode::Json],
98                    // support source stream job
99                    Format::Plain => vec![Encode::Json],
100                ),
101                CITUS_CDC_CONNECTOR => hashmap!(
102                    Format::Debezium => vec![Encode::Json],
103                ),
104                MONGODB_CDC_CONNECTOR => hashmap!(
105                    Format::DebeziumMongo => vec![Encode::Json],
106                ),
107                NATS_CONNECTOR => hashmap!(
108                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
109                ),
110                MQTT_CONNECTOR => hashmap!(
111                    Format::Plain => vec![Encode::Json, Encode::Bytes],
112                ),
113                TEST_CONNECTOR => hashmap!(
114                    Format::Plain => vec![Encode::Json],
115                ),
116                ICEBERG_CONNECTOR => hashmap!(
117                    Format::None => vec![Encode::None],
118                ),
119                ADBC_SNOWFLAKE_CONNECTOR => hashmap!(
120                    Format::None => vec![Encode::None],
121                ),
122                SQL_SERVER_CDC_CONNECTOR => hashmap!(
123                    Format::Debezium => vec![Encode::Json],
124                    // support source stream job
125                    Format::Plain => vec![Encode::Json],
126                ),
127        ))
128    });
129
130fn validate_license(connector: &str) -> Result<()> {
131    if connector == SQL_SERVER_CDC_CONNECTOR {
132        Feature::SqlServerCdcSource.check_available()?;
133    }
134    Ok(())
135}
136
137pub fn validate_compatibility(
138    format_encode: &FormatEncodeOptions,
139    props: &mut BTreeMap<String, String>,
140) -> Result<()> {
141    let mut connector = props
142        .get_connector()
143        .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
144
145    if connector == OPENDAL_S3_CONNECTOR {
146        // reject s3_v2 creation
147        return Err(RwError::from(Deprecated(
148            OPENDAL_S3_CONNECTOR.to_owned(),
149            LEGACY_S3_CONNECTOR.to_owned(),
150        )));
151    }
152    if connector == LEGACY_S3_CONNECTOR {
153        // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
154        // do s3 -> s3_v2 migration
155        let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
156        *entry = OPENDAL_S3_CONNECTOR.to_owned();
157        connector = OPENDAL_S3_CONNECTOR.to_owned();
158    }
159
160    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
161        .get(&connector)
162        .ok_or_else(|| {
163            RwError::from(ProtocolError(format!(
164                "connector {:?} is not supported, accept {:?}",
165                connector,
166                CONNECTORS_COMPATIBLE_FORMATS.keys()
167            )))
168        })?;
169
170    validate_license(&connector)?;
171    if connector != KAFKA_CONNECTOR {
172        let res = match (&format_encode.format, &format_encode.row_encode) {
173            (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
174                let mut options = WithOptions::try_from(format_encode.row_options())?;
175                let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
176                use_schema_registry
177            }
178            (Format::Debezium, Encode::Avro) => true,
179            (_, _) => false,
180        };
181        if res {
182            return Err(RwError::from(ProtocolError(format!(
183                "The {} must be kafka when schema registry is used",
184                UPSTREAM_SOURCE_KEY
185            ))));
186        }
187    }
188
189    let compatible_encodes = compatible_formats
190        .get(&format_encode.format)
191        .ok_or_else(|| {
192            RwError::from(ProtocolError(format!(
193                "connector {} does not support format {:?}",
194                connector, format_encode.format
195            )))
196        })?;
197    if !compatible_encodes.contains(&format_encode.row_encode) {
198        return Err(RwError::from(ProtocolError(format!(
199            "connector {} does not support format {:?} with encode {:?}",
200            connector, format_encode.format, format_encode.row_encode
201        ))));
202    }
203
204    if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
205        match props.get("slot.name") {
206            None => {
207                // Build a random slot name with UUID
208                // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
209                let uuid = uuid::Uuid::new_v4();
210                props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
211            }
212            Some(slot_name) => {
213                // please refer to
214                // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179
215                // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3
216                if !slot_name
217                    .chars()
218                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
219                    || slot_name.len() > 63
220                {
221                    return Err(RwError::from(ProtocolError(format!(
222                        "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
223                        slot_name
224                    ))));
225                }
226            }
227        }
228
229        if !props.contains_key("schema.name") {
230            // Default schema name is "public"
231            props.insert("schema.name".into(), "public".into());
232        }
233        if !props.contains_key("publication.name") {
234            // Default publication name is "rw_publication"
235            props.insert("publication.name".into(), "rw_publication".into());
236        }
237        if !props.contains_key("publication.create.enable") {
238            // Default auto create publication if doesn't exist
239            props.insert("publication.create.enable".into(), "true".into());
240        }
241    }
242
243    if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
244        // Default schema name is "dbo"
245        props.insert("schema.name".into(), "dbo".into());
246    }
247
248    // Validate cdc.source.wait.streaming.start.timeout for all CDC connectors
249    if (connector == MYSQL_CDC_CONNECTOR
250        || connector == POSTGRES_CDC_CONNECTOR
251        || connector == CITUS_CDC_CONNECTOR
252        || connector == MONGODB_CDC_CONNECTOR
253        || connector == SQL_SERVER_CDC_CONNECTOR)
254        && let Some(timeout_value) = props.get("cdc.source.wait.streaming.start.timeout")
255        && timeout_value.parse::<u32>().is_err()
256    {
257        return Err(ErrorCode::InvalidConfigValue {
258            config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
259            config_value: timeout_value.to_owned(),
260        }
261        .into());
262    }
263
264    // Validate debezium.max.queue.size for all CDC connectors
265    if (connector == MYSQL_CDC_CONNECTOR
266        || connector == POSTGRES_CDC_CONNECTOR
267        || connector == CITUS_CDC_CONNECTOR
268        || connector == MONGODB_CDC_CONNECTOR
269        || connector == SQL_SERVER_CDC_CONNECTOR)
270        && let Some(queue_size_value) = props.get("debezium.max.queue.size")
271        && queue_size_value.parse::<u32>().is_err()
272    {
273        return Err(ErrorCode::InvalidConfigValue {
274            config_entry: "debezium.max.queue.size".to_owned(),
275            config_value: queue_size_value.to_owned(),
276        }
277        .into());
278    }
279
280    Ok(())
281}