risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![feature(map_try_insert)]
37#![register_tool(rw)]
38#![recursion_limit = "256"]
39#![feature(min_specialization)]
40
41use std::time::Duration;
42
43use duration_str::parse_std;
44use serde::de;
45
46pub mod aws_utils;
47
48#[rustfmt::skip]
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72
73pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
74where
75    D: de::Deserializer<'de>,
76{
77    let s: String = de::Deserialize::deserialize(deserializer)?;
78    s.parse().map_err(|_| {
79        de::Error::invalid_value(
80            de::Unexpected::Str(&s),
81            &"integer greater than or equal to 0",
82        )
83    })
84}
85
86pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
87    deserializer: D,
88) -> std::result::Result<Option<Vec<String>>, D::Error>
89where
90    D: de::Deserializer<'de>,
91{
92    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
93    if let Some(s) = s {
94        let s = s.to_ascii_lowercase();
95        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
96        Ok(Some(s))
97    } else {
98        Ok(None)
99    }
100}
101
102pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
103    deserializer: D,
104) -> std::result::Result<Option<Vec<u64>>, D::Error>
105where
106    D: de::Deserializer<'de>,
107{
108    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
109    if let Some(s) = s {
110        let numbers = s
111            .split(',')
112            .map(|s| s.trim().parse())
113            .collect::<Result<Vec<u64>, _>>()
114            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
115        Ok(Some(numbers?))
116    } else {
117        Ok(None)
118    }
119}
120
121pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
122where
123    D: de::Deserializer<'de>,
124{
125    let s: String = de::Deserialize::deserialize(deserializer)?;
126    let s = s.to_ascii_lowercase();
127    match s.as_str() {
128        "true" => Ok(true),
129        "false" => Ok(false),
130        _ => Err(de::Error::invalid_value(
131            de::Unexpected::Str(&s),
132            &"true or false",
133        )),
134    }
135}
136
137pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
138    deserializer: D,
139) -> std::result::Result<Option<bool>, D::Error>
140where
141    D: de::Deserializer<'de>,
142{
143    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
144    if let Some(s) = s {
145        let s = s.to_ascii_lowercase();
146        match s.as_str() {
147            "true" => Ok(Some(true)),
148            "false" => Ok(Some(false)),
149            _ => Err(de::Error::invalid_value(
150                de::Unexpected::Str(&s),
151                &"true or false",
152            )),
153        }
154    } else {
155        Ok(None)
156    }
157}
158
159pub(crate) fn deserialize_duration_from_string<'de, D>(
160    deserializer: D,
161) -> Result<Duration, D::Error>
162where
163    D: de::Deserializer<'de>,
164{
165    let s: String = de::Deserialize::deserialize(deserializer)?;
166    parse_std(&s).map_err(|_| de::Error::invalid_value(
167        de::Unexpected::Str(&s),
168        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
169    ))
170}
171
172#[cfg(test)]
173mod tests {
174    use expect_test::expect_file;
175
176    use crate::with_options_test::{
177        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
178        generate_with_options_yaml_sink, generate_with_options_yaml_source,
179    };
180
181    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
182    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
183    /// test fails.
184    #[test]
185    fn test_with_options_yaml_up_to_date() {
186        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
187
188        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
189
190        expect_file!("../with_options_connection.yaml")
191            .assert_eq(&generate_with_options_yaml_connection());
192    }
193
194    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
195    #[test]
196    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
197        expect_file!("../src/allow_alter_on_fly_fields.rs")
198            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
199    }
200
201    /// Test some serde behavior we rely on.
202    mod serde {
203        #![expect(dead_code)]
204
205        use std::collections::BTreeMap;
206
207        use expect_test::expect;
208        use serde::Deserialize;
209
210        // test deny_unknown_fields and flatten
211
212        // TL;DR: deny_unknown_fields
213        // - doesn't work with flatten map
214        // - can work with flatten struct
215        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
216
217        #[test]
218        fn test_outer_deny() {
219            #[derive(Deserialize, Debug)]
220            #[serde(deny_unknown_fields)]
221            struct FlattenMap {
222                #[serde(flatten)]
223                flatten: BTreeMap<String, String>,
224            }
225            #[derive(Deserialize, Debug)]
226            #[serde(deny_unknown_fields)]
227            struct FlattenStruct {
228                #[serde(flatten)]
229                flatten_struct: Inner,
230            }
231
232            #[derive(Deserialize, Debug)]
233            #[serde(deny_unknown_fields)]
234            struct FlattenBoth {
235                #[serde(flatten)]
236                flatten: BTreeMap<String, String>,
237                #[serde(flatten)]
238                flatten_struct: Inner,
239            }
240
241            #[derive(Deserialize, Debug)]
242            struct Inner {
243                a: Option<String>,
244                b: Option<String>,
245            }
246
247            let json = r#"{
248                "a": "b"
249            }"#;
250            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
251            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
252            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
253
254            // with `deny_unknown_fields`, we can't flatten ONLY a map
255            expect![[r#"
256                Err(
257                    Error("unknown field `a`", line: 3, column: 13),
258                )
259            "#]]
260            .assert_debug_eq(&foo);
261
262            // but can flatten a struct!
263            expect![[r#"
264                Ok(
265                    FlattenStruct {
266                        flatten_struct: Inner {
267                            a: Some(
268                                "b",
269                            ),
270                            b: None,
271                        },
272                    },
273                )
274            "#]]
275            .assert_debug_eq(&foo1);
276            // unknown fields can be denied.
277            let foo11: Result<FlattenStruct, _> =
278                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
279            expect_test::expect![[r#"
280                Err(
281                    Error("unknown field `unknown`", line: 1, column: 25),
282                )
283            "#]]
284            .assert_debug_eq(&foo11);
285
286            // When both struct and map are flattened, the map also works...
287            expect![[r#"
288                Ok(
289                    FlattenBoth {
290                        flatten: {
291                            "a": "b",
292                        },
293                        flatten_struct: Inner {
294                            a: Some(
295                                "b",
296                            ),
297                            b: None,
298                        },
299                    },
300                )
301            "#]]
302            .assert_debug_eq(&foo2);
303
304            let foo21: Result<FlattenBoth, _> =
305                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
306            expect_test::expect![[r#"
307                Err(
308                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
309                )
310            "#]]
311            .assert_debug_eq(&foo21);
312            // This error above is a little funny, since even if we use string, it will still fail.
313            let foo22: Result<FlattenBoth, _> =
314                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
315            expect_test::expect![[r#"
316                Err(
317                    Error("unknown field `unknown`", line: 1, column: 27),
318                )
319            "#]]
320            .assert_debug_eq(&foo22);
321        }
322
323        #[test]
324        fn test_inner_deny() {
325            // no outer deny now.
326            #[derive(Deserialize, Debug)]
327            struct FlattenStruct {
328                #[serde(flatten)]
329                flatten_struct: Inner,
330            }
331            #[derive(Deserialize, Debug)]
332            #[serde(deny_unknown_fields)]
333            struct Inner {
334                a: Option<String>,
335                b: Option<String>,
336            }
337
338            let json = r#"{
339                "a": "b", "unknown":1
340            }"#;
341            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
342            // unknown fields cannot be denied.
343            // I think this is because `deserialize_struct` is called, and required fields are passed.
344            // Other fields are left for the outer struct to consume.
345            expect_test::expect![[r#"
346                Ok(
347                    FlattenStruct {
348                        flatten_struct: Inner {
349                            a: Some(
350                                "b",
351                            ),
352                            b: None,
353                        },
354                    },
355                )
356            "#]]
357            .assert_debug_eq(&foo);
358        }
359
360        #[test]
361        fn test_multiple_flatten() {
362            #[derive(Deserialize, Debug)]
363            struct Foo {
364                /// struct will "consume" the used fields!
365                #[serde(flatten)]
366                flatten_struct: Inner1,
367
368                /// map will keep the unknown fields!
369                #[serde(flatten)]
370                flatten_map1: BTreeMap<String, String>,
371
372                #[serde(flatten)]
373                flatten_map2: BTreeMap<String, String>,
374
375                #[serde(flatten)]
376                flatten_struct2: Inner2,
377            }
378
379            #[derive(Deserialize, Debug)]
380            #[serde(deny_unknown_fields)]
381            struct Inner1 {
382                a: Option<String>,
383                b: Option<String>,
384            }
385            #[derive(Deserialize, Debug)]
386            struct Inner11 {
387                c: Option<String>,
388            }
389            #[derive(Deserialize, Debug)]
390            #[serde(deny_unknown_fields)]
391            struct Inner2 {
392                c: Option<String>,
393            }
394
395            let json = r#"{
396                "a": "b", "c":"d"
397            }"#;
398            let foo2: Result<Foo, _> = serde_json::from_str(json);
399
400            // When there are multiple flatten, all of them will be used.
401            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
402            expect![[r#"
403            Ok(
404                Foo {
405                    flatten_struct: Inner1 {
406                        a: Some(
407                            "b",
408                        ),
409                        b: None,
410                    },
411                    flatten_map1: {
412                        "c": "d",
413                    },
414                    flatten_map2: {
415                        "c": "d",
416                    },
417                    flatten_struct2: Inner2 {
418                        c: Some(
419                            "d",
420                        ),
421                    },
422                },
423            )
424        "#]]
425            .assert_debug_eq(&foo2);
426        }
427
428        #[test]
429        fn test_nested_flatten() {
430            #[derive(Deserialize, Debug)]
431            #[serde(deny_unknown_fields)]
432            struct Outer {
433                #[serde(flatten)]
434                inner: Inner,
435            }
436
437            #[derive(Deserialize, Debug)]
438            struct Inner {
439                a: Option<String>,
440                b: Option<String>,
441                #[serde(flatten)]
442                nested: InnerInner,
443            }
444
445            #[derive(Deserialize, Debug)]
446            struct InnerInner {
447                c: Option<String>,
448            }
449
450            let json = r#"{ "a": "b", "unknown":"1" }"#;
451
452            let foo: Result<Outer, _> = serde_json::from_str(json);
453
454            // This is very unfortunate...
455            expect_test::expect![[r#"
456            Err(
457                Error("unknown field `a`", line: 1, column: 27),
458            )
459        "#]]
460            .assert_debug_eq(&foo);
461
462            // Actually, the nested `flatten` will makes the struct behave like a map.
463            // Let's remove `deny_unknown_fields` and see
464            #[derive(Deserialize, Debug)]
465            struct Outer2 {
466                #[serde(flatten)]
467                inner: Inner,
468                /// We can see the fields of `inner` are not consumed.
469                #[serde(flatten)]
470                map: BTreeMap<String, String>,
471            }
472            let foo2: Result<Outer2, _> = serde_json::from_str(json);
473            expect_test::expect![[r#"
474                Ok(
475                    Outer2 {
476                        inner: Inner {
477                            a: Some(
478                                "b",
479                            ),
480                            b: None,
481                            nested: InnerInner {
482                                c: None,
483                            },
484                        },
485                        map: {
486                            "a": "b",
487                            "unknown": "1",
488                        },
489                    },
490                )
491            "#]]
492            .assert_debug_eq(&foo2);
493        }
494
495        #[test]
496        fn test_flatten_option() {
497            #[derive(Deserialize, Debug)]
498            struct Foo {
499                /// flatten option struct can still consume the field
500                #[serde(flatten)]
501                flatten_struct: Option<Inner1>,
502
503                /// flatten option map is always `Some`
504                #[serde(flatten)]
505                flatten_map1: Option<BTreeMap<String, String>>,
506
507                /// flatten option struct is `None` if the required field is absent
508                #[serde(flatten)]
509                flatten_struct2: Option<Inner2>,
510
511                /// flatten option struct is `Some` if the required field is present and optional field is absent.
512                /// Note: if all fields are optional, the struct is always `Some`
513                #[serde(flatten)]
514                flatten_struct3: Option<Inner3>,
515            }
516
517            #[derive(Deserialize, Debug)]
518            struct Inner1 {
519                a: Option<String>,
520                b: Option<String>,
521            }
522            #[derive(Deserialize, Debug)]
523            struct Inner11 {
524                c: Option<String>,
525            }
526
527            #[derive(Deserialize, Debug)]
528            struct Inner2 {
529                c: Option<String>,
530                d: String,
531            }
532
533            #[derive(Deserialize, Debug)]
534            struct Inner3 {
535                e: Option<String>,
536                f: String,
537            }
538
539            let json = r#"{
540        "a": "b", "c": "d", "f": "g"
541     }"#;
542            let foo: Result<Foo, _> = serde_json::from_str(json);
543            expect![[r#"
544                Ok(
545                    Foo {
546                        flatten_struct: Some(
547                            Inner1 {
548                                a: Some(
549                                    "b",
550                                ),
551                                b: None,
552                            },
553                        ),
554                        flatten_map1: Some(
555                            {
556                                "c": "d",
557                                "f": "g",
558                            },
559                        ),
560                        flatten_struct2: None,
561                        flatten_struct3: Some(
562                            Inner3 {
563                                e: None,
564                                f: "g",
565                            },
566                        ),
567                    },
568                )
569            "#]]
570            .assert_debug_eq(&foo);
571        }
572    }
573}