risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39
40use std::time::Duration;
41
42use duration_str::parse_std;
43use serde::de;
44
45pub mod aws_utils;
46mod enforce_secret;
47pub mod error;
48mod macros;
49
50pub mod parser;
51pub mod schema;
52pub mod sink;
53pub mod source;
54
55pub mod connector_common;
56
57pub use paste::paste;
58pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
59
60mod with_options;
61pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
62
63#[cfg(test)]
64mod with_options_test;
65
66pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
67where
68    D: de::Deserializer<'de>,
69{
70    let s: String = de::Deserialize::deserialize(deserializer)?;
71    s.parse().map_err(|_| {
72        de::Error::invalid_value(
73            de::Unexpected::Str(&s),
74            &"integer greater than or equal to 0",
75        )
76    })
77}
78
79pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
80    deserializer: D,
81) -> std::result::Result<Option<Vec<String>>, D::Error>
82where
83    D: de::Deserializer<'de>,
84{
85    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
86    if let Some(s) = s {
87        let s = s.to_ascii_lowercase();
88        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
89        Ok(Some(s))
90    } else {
91        Ok(None)
92    }
93}
94
95pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
96    deserializer: D,
97) -> std::result::Result<Option<Vec<u64>>, D::Error>
98where
99    D: de::Deserializer<'de>,
100{
101    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
102    if let Some(s) = s {
103        let numbers = s
104            .split(',')
105            .map(|s| s.trim().parse())
106            .collect::<Result<Vec<u64>, _>>()
107            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
108        Ok(Some(numbers?))
109    } else {
110        Ok(None)
111    }
112}
113
114pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
115where
116    D: de::Deserializer<'de>,
117{
118    let s: String = de::Deserialize::deserialize(deserializer)?;
119    let s = s.to_ascii_lowercase();
120    match s.as_str() {
121        "true" => Ok(true),
122        "false" => Ok(false),
123        _ => Err(de::Error::invalid_value(
124            de::Unexpected::Str(&s),
125            &"true or false",
126        )),
127    }
128}
129
130pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
131    deserializer: D,
132) -> std::result::Result<Option<bool>, D::Error>
133where
134    D: de::Deserializer<'de>,
135{
136    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
137    if let Some(s) = s {
138        let s = s.to_ascii_lowercase();
139        match s.as_str() {
140            "true" => Ok(Some(true)),
141            "false" => Ok(Some(false)),
142            _ => Err(de::Error::invalid_value(
143                de::Unexpected::Str(&s),
144                &"true or false",
145            )),
146        }
147    } else {
148        Ok(None)
149    }
150}
151
152pub(crate) fn deserialize_duration_from_string<'de, D>(
153    deserializer: D,
154) -> Result<Duration, D::Error>
155where
156    D: de::Deserializer<'de>,
157{
158    let s: String = de::Deserialize::deserialize(deserializer)?;
159    parse_std(&s).map_err(|_| de::Error::invalid_value(
160        de::Unexpected::Str(&s),
161        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
162    ))
163}
164
165#[cfg(test)]
166mod tests {
167    use expect_test::expect_file;
168
169    use crate::with_options_test::{
170        generate_with_options_yaml_sink, generate_with_options_yaml_source,
171    };
172
173    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
174    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
175    /// test fails.
176    #[test]
177    fn test_with_options_yaml_up_to_date() {
178        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
179
180        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
181    }
182
183    /// Test some serde behavior we rely on.
184    mod serde {
185        #![expect(dead_code)]
186
187        use std::collections::BTreeMap;
188
189        use expect_test::expect;
190        use serde::Deserialize;
191
192        // test deny_unknown_fields and flatten
193
194        // TL;DR: deny_unknown_fields
195        // - doesn't work with flatten map
196        // - can work with flatten struct
197        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
198
199        #[test]
200        fn test_outer_deny() {
201            #[derive(Deserialize, Debug)]
202            #[serde(deny_unknown_fields)]
203            struct FlattenMap {
204                #[serde(flatten)]
205                flatten: BTreeMap<String, String>,
206            }
207            #[derive(Deserialize, Debug)]
208            #[serde(deny_unknown_fields)]
209            struct FlattenStruct {
210                #[serde(flatten)]
211                flatten_struct: Inner,
212            }
213
214            #[derive(Deserialize, Debug)]
215            #[serde(deny_unknown_fields)]
216            struct FlattenBoth {
217                #[serde(flatten)]
218                flatten: BTreeMap<String, String>,
219                #[serde(flatten)]
220                flatten_struct: Inner,
221            }
222
223            #[derive(Deserialize, Debug)]
224            struct Inner {
225                a: Option<String>,
226                b: Option<String>,
227            }
228
229            let json = r#"{
230                "a": "b"
231            }"#;
232            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
233            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
234            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
235
236            // with `deny_unknown_fields`, we can't flatten ONLY a map
237            expect![[r#"
238                Err(
239                    Error("unknown field `a`", line: 3, column: 13),
240                )
241            "#]]
242            .assert_debug_eq(&foo);
243
244            // but can flatten a struct!
245            expect![[r#"
246                Ok(
247                    FlattenStruct {
248                        flatten_struct: Inner {
249                            a: Some(
250                                "b",
251                            ),
252                            b: None,
253                        },
254                    },
255                )
256            "#]]
257            .assert_debug_eq(&foo1);
258            // unknown fields can be denied.
259            let foo11: Result<FlattenStruct, _> =
260                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
261            expect_test::expect![[r#"
262                Err(
263                    Error("unknown field `unknown`", line: 1, column: 25),
264                )
265            "#]]
266            .assert_debug_eq(&foo11);
267
268            // When both struct and map are flattened, the map also works...
269            expect![[r#"
270                Ok(
271                    FlattenBoth {
272                        flatten: {
273                            "a": "b",
274                        },
275                        flatten_struct: Inner {
276                            a: Some(
277                                "b",
278                            ),
279                            b: None,
280                        },
281                    },
282                )
283            "#]]
284            .assert_debug_eq(&foo2);
285
286            let foo21: Result<FlattenBoth, _> =
287                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
288            expect_test::expect![[r#"
289                Err(
290                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
291                )
292            "#]]
293            .assert_debug_eq(&foo21);
294            // This error above is a little funny, since even if we use string, it will still fail.
295            let foo22: Result<FlattenBoth, _> =
296                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
297            expect_test::expect![[r#"
298                Err(
299                    Error("unknown field `unknown`", line: 1, column: 27),
300                )
301            "#]]
302            .assert_debug_eq(&foo22);
303        }
304
305        #[test]
306        fn test_inner_deny() {
307            // no outer deny now.
308            #[derive(Deserialize, Debug)]
309            struct FlattenStruct {
310                #[serde(flatten)]
311                flatten_struct: Inner,
312            }
313            #[derive(Deserialize, Debug)]
314            #[serde(deny_unknown_fields)]
315            struct Inner {
316                a: Option<String>,
317                b: Option<String>,
318            }
319
320            let json = r#"{
321                "a": "b", "unknown":1
322            }"#;
323            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
324            // unknown fields cannot be denied.
325            // I think this is because `deserialize_struct` is called, and required fields are passed.
326            // Other fields are left for the outer struct to consume.
327            expect_test::expect![[r#"
328                Ok(
329                    FlattenStruct {
330                        flatten_struct: Inner {
331                            a: Some(
332                                "b",
333                            ),
334                            b: None,
335                        },
336                    },
337                )
338            "#]]
339            .assert_debug_eq(&foo);
340        }
341
342        #[test]
343        fn test_multiple_flatten() {
344            #[derive(Deserialize, Debug)]
345            struct Foo {
346                /// struct will "consume" the used fields!
347                #[serde(flatten)]
348                flatten_struct: Inner1,
349
350                /// map will keep the unknown fields!
351                #[serde(flatten)]
352                flatten_map1: BTreeMap<String, String>,
353
354                #[serde(flatten)]
355                flatten_map2: BTreeMap<String, String>,
356
357                #[serde(flatten)]
358                flatten_struct2: Inner2,
359            }
360
361            #[derive(Deserialize, Debug)]
362            #[serde(deny_unknown_fields)]
363            struct Inner1 {
364                a: Option<String>,
365                b: Option<String>,
366            }
367            #[derive(Deserialize, Debug)]
368            struct Inner11 {
369                c: Option<String>,
370            }
371            #[derive(Deserialize, Debug)]
372            #[serde(deny_unknown_fields)]
373            struct Inner2 {
374                c: Option<String>,
375            }
376
377            let json = r#"{
378                "a": "b", "c":"d"
379            }"#;
380            let foo2: Result<Foo, _> = serde_json::from_str(json);
381
382            // When there are multiple flatten, all of them will be used.
383            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
384            expect![[r#"
385            Ok(
386                Foo {
387                    flatten_struct: Inner1 {
388                        a: Some(
389                            "b",
390                        ),
391                        b: None,
392                    },
393                    flatten_map1: {
394                        "c": "d",
395                    },
396                    flatten_map2: {
397                        "c": "d",
398                    },
399                    flatten_struct2: Inner2 {
400                        c: Some(
401                            "d",
402                        ),
403                    },
404                },
405            )
406        "#]]
407            .assert_debug_eq(&foo2);
408        }
409
410        #[test]
411        fn test_nested_flatten() {
412            #[derive(Deserialize, Debug)]
413            #[serde(deny_unknown_fields)]
414            struct Outer {
415                #[serde(flatten)]
416                inner: Inner,
417            }
418
419            #[derive(Deserialize, Debug)]
420            struct Inner {
421                a: Option<String>,
422                b: Option<String>,
423                #[serde(flatten)]
424                nested: InnerInner,
425            }
426
427            #[derive(Deserialize, Debug)]
428            struct InnerInner {
429                c: Option<String>,
430            }
431
432            let json = r#"{ "a": "b", "unknown":"1" }"#;
433
434            let foo: Result<Outer, _> = serde_json::from_str(json);
435
436            // This is very unfortunate...
437            expect_test::expect![[r#"
438            Err(
439                Error("unknown field `a`", line: 1, column: 27),
440            )
441        "#]]
442            .assert_debug_eq(&foo);
443
444            // Actually, the nested `flatten` will makes the struct behave like a map.
445            // Let's remove `deny_unknown_fields` and see
446            #[derive(Deserialize, Debug)]
447            struct Outer2 {
448                #[serde(flatten)]
449                inner: Inner,
450                /// We can see the fields of `inner` are not consumed.
451                #[serde(flatten)]
452                map: BTreeMap<String, String>,
453            }
454            let foo2: Result<Outer2, _> = serde_json::from_str(json);
455            expect_test::expect![[r#"
456                Ok(
457                    Outer2 {
458                        inner: Inner {
459                            a: Some(
460                                "b",
461                            ),
462                            b: None,
463                            nested: InnerInner {
464                                c: None,
465                            },
466                        },
467                        map: {
468                            "a": "b",
469                            "unknown": "1",
470                        },
471                    },
472                )
473            "#]]
474            .assert_debug_eq(&foo2);
475        }
476
477        #[test]
478        fn test_flatten_option() {
479            #[derive(Deserialize, Debug)]
480            struct Foo {
481                /// flatten option struct can still consume the field
482                #[serde(flatten)]
483                flatten_struct: Option<Inner1>,
484
485                /// flatten option map is always `Some`
486                #[serde(flatten)]
487                flatten_map1: Option<BTreeMap<String, String>>,
488
489                /// flatten option struct is `None` if the required field is absent
490                #[serde(flatten)]
491                flatten_struct2: Option<Inner2>,
492
493                /// flatten option struct is `Some` if the required field is present and optional field is absent.
494                /// Note: if all fields are optional, the struct is always `Some`
495                #[serde(flatten)]
496                flatten_struct3: Option<Inner3>,
497            }
498
499            #[derive(Deserialize, Debug)]
500            struct Inner1 {
501                a: Option<String>,
502                b: Option<String>,
503            }
504            #[derive(Deserialize, Debug)]
505            struct Inner11 {
506                c: Option<String>,
507            }
508
509            #[derive(Deserialize, Debug)]
510            struct Inner2 {
511                c: Option<String>,
512                d: String,
513            }
514
515            #[derive(Deserialize, Debug)]
516            struct Inner3 {
517                e: Option<String>,
518                f: String,
519            }
520
521            let json = r#"{
522        "a": "b", "c": "d", "f": "g"
523     }"#;
524            let foo: Result<Foo, _> = serde_json::from_str(json);
525            expect![[r#"
526                Ok(
527                    Foo {
528                        flatten_struct: Some(
529                            Inner1 {
530                                a: Some(
531                                    "b",
532                                ),
533                                b: None,
534                            },
535                        ),
536                        flatten_map1: Some(
537                            {
538                                "c": "d",
539                                "f": "g",
540                            },
541                        ),
542                        flatten_struct2: None,
543                        flatten_struct3: Some(
544                            Inner3 {
545                                e: None,
546                                f: "g",
547                            },
548                        ),
549                    },
550                )
551            "#]]
552            .assert_debug_eq(&foo);
553        }
554    }
555}