risingwave_frontend/handler/create_source/
validate.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::source::{ADBC_SNOWFLAKE_CONNECTOR, BATCH_POSIX_FS_CONNECTOR};
16
17use super::*;
18
19pub static SOURCE_ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> =
20    LazyLock::new(|| {
21        hashset! {
22            PbConnectionType::Unspecified,
23            PbConnectionType::Kafka,
24            PbConnectionType::Iceberg,
25        }
26    });
27
28pub static SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
29    LazyLock::new(|| {
30        hashset! {
31            PbConnectionType::Unspecified,
32            PbConnectionType::SchemaRegistry,
33        }
34    });
35
36// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
37static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
38    LazyLock::new(|| {
39        convert_args!(hashmap!(
40                KAFKA_CONNECTOR => hashmap!(
41                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
42                    Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
43                    Format::Debezium => vec![Encode::Json, Encode::Avro],
44                    Format::DebeziumMongo => vec![Encode::Json],
45                ),
46                PULSAR_CONNECTOR => hashmap!(
47                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
48                    Format::Upsert => vec![Encode::Json, Encode::Avro],
49                    Format::Debezium => vec![Encode::Json],
50                ),
51                KINESIS_CONNECTOR => hashmap!(
52                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv],
53                    Format::Upsert => vec![Encode::Json, Encode::Avro],
54                    Format::Debezium => vec![Encode::Json],
55                ),
56                GOOGLE_PUBSUB_CONNECTOR => hashmap!(
57                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes],
58                    Format::Debezium => vec![Encode::Json],
59                ),
60                NEXMARK_CONNECTOR => hashmap!(
61                    Format::Native => vec![Encode::Native],
62                    Format::Plain => vec![Encode::Bytes],
63                ),
64                DATAGEN_CONNECTOR => hashmap!(
65                    Format::Native => vec![Encode::Native],
66                    Format::Plain => vec![Encode::Bytes, Encode::Json],
67                ),
68                OPENDAL_S3_CONNECTOR => hashmap!(
69                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
70                ),
71                GCS_CONNECTOR => hashmap!(
72                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
73                ),
74                AZBLOB_CONNECTOR => hashmap!(
75                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
76                ),
77                POSIX_FS_CONNECTOR => hashmap!(
78                    Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
79                ),
80                BATCH_POSIX_FS_CONNECTOR => hashmap!(
81                    Format::Plain => vec![Encode::Csv],
82                ),
83                MYSQL_CDC_CONNECTOR => hashmap!(
84                    Format::Debezium => vec![Encode::Json],
85                    // support source stream job
86                    Format::Plain => vec![Encode::Json],
87                ),
88                POSTGRES_CDC_CONNECTOR => hashmap!(
89                    Format::Debezium => vec![Encode::Json],
90                    // support source stream job
91                    Format::Plain => vec![Encode::Json],
92                ),
93                MONGODB_CDC_CONNECTOR => hashmap!(
94                    Format::DebeziumMongo => vec![Encode::Json],
95                ),
96                NATS_CONNECTOR => hashmap!(
97                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
98                ),
99                MQTT_CONNECTOR => hashmap!(
100                    Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Bytes],
101                ),
102                TEST_CONNECTOR => hashmap!(
103                    Format::Plain => vec![Encode::Json],
104                ),
105                ICEBERG_CONNECTOR => hashmap!(
106                    Format::None => vec![Encode::None],
107                ),
108                ADBC_SNOWFLAKE_CONNECTOR => hashmap!(
109                    Format::None => vec![Encode::None],
110                ),
111                SQL_SERVER_CDC_CONNECTOR => hashmap!(
112                    Format::Debezium => vec![Encode::Json],
113                    // support source stream job
114                    Format::Plain => vec![Encode::Json],
115                ),
116        ))
117    });
118
119fn validate_license(connector: &str) -> Result<()> {
120    if connector == SQL_SERVER_CDC_CONNECTOR {
121        Feature::SqlServerCdcSource.check_available()?;
122    }
123    Ok(())
124}
125
126pub fn validate_compatibility(
127    format_encode: &FormatEncodeOptions,
128    props: &mut BTreeMap<String, String>,
129) -> Result<()> {
130    let mut connector = props
131        .get_connector()
132        .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_owned())))?;
133
134    if connector == OPENDAL_S3_CONNECTOR {
135        // reject s3_v2 creation
136        return Err(RwError::from(Deprecated(
137            OPENDAL_S3_CONNECTOR.to_owned(),
138            LEGACY_S3_CONNECTOR.to_owned(),
139        )));
140    }
141    if connector == LEGACY_S3_CONNECTOR {
142        // S3 connector is deprecated, use OPENDAL_S3_CONNECTOR instead
143        // do s3 -> s3_v2 migration
144        let entry = props.get_mut(UPSTREAM_SOURCE_KEY).unwrap();
145        *entry = OPENDAL_S3_CONNECTOR.to_owned();
146        connector = OPENDAL_S3_CONNECTOR.to_owned();
147    }
148
149    let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS
150        .get(&connector)
151        .ok_or_else(|| {
152            RwError::from(ProtocolError(format!(
153                "connector {:?} is not supported, accept {:?}",
154                connector,
155                CONNECTORS_COMPATIBLE_FORMATS.keys()
156            )))
157        })?;
158
159    validate_license(&connector)?;
160    if connector != KAFKA_CONNECTOR {
161        let res = match (&format_encode.format, &format_encode.row_encode) {
162            (Format::Plain, Encode::Protobuf) | (Format::Plain, Encode::Avro) => {
163                let mut options = WithOptions::try_from(format_encode.row_options())?;
164                let (_, use_schema_registry) = get_schema_location(options.inner_mut())?;
165                use_schema_registry
166            }
167            (Format::Debezium, Encode::Avro) => true,
168            (_, _) => false,
169        };
170        if res {
171            return Err(RwError::from(ProtocolError(format!(
172                "The {} must be kafka when schema registry is used",
173                UPSTREAM_SOURCE_KEY
174            ))));
175        }
176    }
177
178    let compatible_encodes = compatible_formats
179        .get(&format_encode.format)
180        .ok_or_else(|| {
181            RwError::from(ProtocolError(format!(
182                "connector {} does not support format {:?}",
183                connector, format_encode.format
184            )))
185        })?;
186    if !compatible_encodes.contains(&format_encode.row_encode) {
187        return Err(RwError::from(ProtocolError(format!(
188            "connector {} does not support format {:?} with encode {:?}",
189            connector, format_encode.format, format_encode.row_encode
190        ))));
191    }
192
193    if connector == POSTGRES_CDC_CONNECTOR || connector == CITUS_CDC_CONNECTOR {
194        match props.get("slot.name") {
195            None => {
196                // Build a random slot name with UUID
197                // e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
198                let uuid = uuid::Uuid::new_v4();
199                props.insert("slot.name".into(), format!("rw_cdc_{}", uuid.simple()));
200            }
201            Some(slot_name) => {
202                // please refer to
203                // - https://github.com/debezium/debezium/blob/97956ce25b7612e3413d363658661896b7d2e0a2/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java#L1179
204                // - https://doxygen.postgresql.org/slot_8c.html#afac399f07320b9adfd2c599cf822aaa3
205                if !slot_name
206                    .chars()
207                    .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
208                    || slot_name.len() > 63
209                {
210                    return Err(RwError::from(ProtocolError(format!(
211                        "Invalid replication slot name: {:?}. Valid replication slot name must contain only digits, lowercase characters and underscores with length <= 63",
212                        slot_name
213                    ))));
214                }
215            }
216        }
217
218        if !props.contains_key("schema.name") {
219            // Default schema name is "public"
220            props.insert("schema.name".into(), "public".into());
221        }
222        if !props.contains_key("publication.name") {
223            // Build a random publication name with UUID to avoid conflicts between sources
224            // e.g. "rw_publication_f9a3567e6dd54bf5900444c8b1c03815"
225            let uuid = uuid::Uuid::new_v4();
226            props.insert(
227                "publication.name".into(),
228                format!("rw_publication_{}", uuid.simple()),
229            );
230        }
231        if !props.contains_key("publication.create.enable") {
232            // Default auto create publication if doesn't exist
233            props.insert("publication.create.enable".into(), "true".into());
234        }
235    }
236
237    if connector == SQL_SERVER_CDC_CONNECTOR && !props.contains_key("schema.name") {
238        // Default schema name is "dbo"
239        props.insert("schema.name".into(), "dbo".into());
240    }
241
242    // Validate cdc.source.wait.streaming.start.timeout for all CDC connectors
243    if (connector == MYSQL_CDC_CONNECTOR
244        || connector == POSTGRES_CDC_CONNECTOR
245        || connector == CITUS_CDC_CONNECTOR
246        || connector == MONGODB_CDC_CONNECTOR
247        || connector == SQL_SERVER_CDC_CONNECTOR)
248        && let Some(timeout_value) = props.get("cdc.source.wait.streaming.start.timeout")
249        && timeout_value.parse::<u32>().is_err()
250    {
251        return Err(ErrorCode::InvalidConfigValue {
252            config_entry: "cdc.source.wait.streaming.start.timeout".to_owned(),
253            config_value: timeout_value.to_owned(),
254        }
255        .into());
256    }
257
258    // Validate debezium.max.queue.size for all CDC connectors
259    if (connector == MYSQL_CDC_CONNECTOR
260        || connector == POSTGRES_CDC_CONNECTOR
261        || connector == CITUS_CDC_CONNECTOR
262        || connector == MONGODB_CDC_CONNECTOR
263        || connector == SQL_SERVER_CDC_CONNECTOR)
264        && let Some(queue_size_value) = props.get("debezium.max.queue.size")
265        && queue_size_value.parse::<u32>().is_err()
266    {
267        return Err(ErrorCode::InvalidConfigValue {
268            config_entry: "debezium.max.queue.size".to_owned(),
269            config_value: queue_size_value.to_owned(),
270        }
271        .into());
272    }
273
274    Ok(())
275}