risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39
40use std::time::Duration;
41
42use duration_str::parse_std;
43use serde::de;
44
45pub mod aws_utils;
46pub mod error;
47mod macros;
48
49pub mod parser;
50pub mod schema;
51pub mod sink;
52pub mod source;
53
54pub mod connector_common;
55
56pub use paste::paste;
57pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
58
59mod with_options;
60pub use with_options::{WithOptionsSecResolved, WithPropertiesExt};
61
62#[cfg(test)]
63mod with_options_test;
64
65pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
66where
67    D: de::Deserializer<'de>,
68{
69    let s: String = de::Deserialize::deserialize(deserializer)?;
70    s.parse().map_err(|_| {
71        de::Error::invalid_value(
72            de::Unexpected::Str(&s),
73            &"integer greater than or equal to 0",
74        )
75    })
76}
77
78pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
79    deserializer: D,
80) -> std::result::Result<Option<Vec<String>>, D::Error>
81where
82    D: de::Deserializer<'de>,
83{
84    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
85    if let Some(s) = s {
86        let s = s.to_ascii_lowercase();
87        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
88        Ok(Some(s))
89    } else {
90        Ok(None)
91    }
92}
93
94pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
95    deserializer: D,
96) -> std::result::Result<Option<Vec<u64>>, D::Error>
97where
98    D: de::Deserializer<'de>,
99{
100    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
101    if let Some(s) = s {
102        let numbers = s
103            .split(',')
104            .map(|s| s.trim().parse())
105            .collect::<Result<Vec<u64>, _>>()
106            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
107        Ok(Some(numbers?))
108    } else {
109        Ok(None)
110    }
111}
112
113pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
114where
115    D: de::Deserializer<'de>,
116{
117    let s: String = de::Deserialize::deserialize(deserializer)?;
118    let s = s.to_ascii_lowercase();
119    match s.as_str() {
120        "true" => Ok(true),
121        "false" => Ok(false),
122        _ => Err(de::Error::invalid_value(
123            de::Unexpected::Str(&s),
124            &"true or false",
125        )),
126    }
127}
128
129pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
130    deserializer: D,
131) -> std::result::Result<Option<bool>, D::Error>
132where
133    D: de::Deserializer<'de>,
134{
135    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
136    if let Some(s) = s {
137        let s = s.to_ascii_lowercase();
138        match s.as_str() {
139            "true" => Ok(Some(true)),
140            "false" => Ok(Some(false)),
141            _ => Err(de::Error::invalid_value(
142                de::Unexpected::Str(&s),
143                &"true or false",
144            )),
145        }
146    } else {
147        Ok(None)
148    }
149}
150
151pub(crate) fn deserialize_duration_from_string<'de, D>(
152    deserializer: D,
153) -> Result<Duration, D::Error>
154where
155    D: de::Deserializer<'de>,
156{
157    let s: String = de::Deserialize::deserialize(deserializer)?;
158    parse_std(&s).map_err(|_| de::Error::invalid_value(
159        de::Unexpected::Str(&s),
160        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
161    ))
162}
163
164#[cfg(test)]
165mod tests {
166    use expect_test::expect_file;
167
168    use crate::with_options_test::{
169        generate_with_options_yaml_sink, generate_with_options_yaml_source,
170    };
171
172    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
173    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
174    /// test fails.
175    #[test]
176    fn test_with_options_yaml_up_to_date() {
177        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
178
179        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
180    }
181
182    /// Test some serde behavior we rely on.
183    mod serde {
184        #![expect(dead_code)]
185
186        use std::collections::BTreeMap;
187
188        use expect_test::expect;
189        use serde::Deserialize;
190
191        // test deny_unknown_fields and flatten
192
193        // TL;DR: deny_unknown_fields
194        // - doesn't work with flatten map
195        // - can work with flatten struct
196        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
197
198        #[test]
199        fn test_outer_deny() {
200            #[derive(Deserialize, Debug)]
201            #[serde(deny_unknown_fields)]
202            struct FlattenMap {
203                #[serde(flatten)]
204                flatten: BTreeMap<String, String>,
205            }
206            #[derive(Deserialize, Debug)]
207            #[serde(deny_unknown_fields)]
208            struct FlattenStruct {
209                #[serde(flatten)]
210                flatten_struct: Inner,
211            }
212
213            #[derive(Deserialize, Debug)]
214            #[serde(deny_unknown_fields)]
215            struct FlattenBoth {
216                #[serde(flatten)]
217                flatten: BTreeMap<String, String>,
218                #[serde(flatten)]
219                flatten_struct: Inner,
220            }
221
222            #[derive(Deserialize, Debug)]
223            struct Inner {
224                a: Option<String>,
225                b: Option<String>,
226            }
227
228            let json = r#"{
229                "a": "b"
230            }"#;
231            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
232            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
233            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
234
235            // with `deny_unknown_fields`, we can't flatten ONLY a map
236            expect![[r#"
237                Err(
238                    Error("unknown field `a`", line: 3, column: 13),
239                )
240            "#]]
241            .assert_debug_eq(&foo);
242
243            // but can flatten a struct!
244            expect![[r#"
245                Ok(
246                    FlattenStruct {
247                        flatten_struct: Inner {
248                            a: Some(
249                                "b",
250                            ),
251                            b: None,
252                        },
253                    },
254                )
255            "#]]
256            .assert_debug_eq(&foo1);
257            // unknown fields can be denied.
258            let foo11: Result<FlattenStruct, _> =
259                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
260            expect_test::expect![[r#"
261                Err(
262                    Error("unknown field `unknown`", line: 1, column: 25),
263                )
264            "#]]
265            .assert_debug_eq(&foo11);
266
267            // When both struct and map are flattened, the map also works...
268            expect![[r#"
269                Ok(
270                    FlattenBoth {
271                        flatten: {
272                            "a": "b",
273                        },
274                        flatten_struct: Inner {
275                            a: Some(
276                                "b",
277                            ),
278                            b: None,
279                        },
280                    },
281                )
282            "#]]
283            .assert_debug_eq(&foo2);
284
285            let foo21: Result<FlattenBoth, _> =
286                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
287            expect_test::expect![[r#"
288                Err(
289                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
290                )
291            "#]]
292            .assert_debug_eq(&foo21);
293            // This error above is a little funny, since even if we use string, it will still fail.
294            let foo22: Result<FlattenBoth, _> =
295                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
296            expect_test::expect![[r#"
297                Err(
298                    Error("unknown field `unknown`", line: 1, column: 27),
299                )
300            "#]]
301            .assert_debug_eq(&foo22);
302        }
303
304        #[test]
305        fn test_inner_deny() {
306            // no outer deny now.
307            #[derive(Deserialize, Debug)]
308            struct FlattenStruct {
309                #[serde(flatten)]
310                flatten_struct: Inner,
311            }
312            #[derive(Deserialize, Debug)]
313            #[serde(deny_unknown_fields)]
314            struct Inner {
315                a: Option<String>,
316                b: Option<String>,
317            }
318
319            let json = r#"{
320                "a": "b", "unknown":1
321            }"#;
322            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
323            // unknown fields cannot be denied.
324            // I think this is because `deserialize_struct` is called, and required fields are passed.
325            // Other fields are left for the outer struct to consume.
326            expect_test::expect![[r#"
327                Ok(
328                    FlattenStruct {
329                        flatten_struct: Inner {
330                            a: Some(
331                                "b",
332                            ),
333                            b: None,
334                        },
335                    },
336                )
337            "#]]
338            .assert_debug_eq(&foo);
339        }
340
341        #[test]
342        fn test_multiple_flatten() {
343            #[derive(Deserialize, Debug)]
344            struct Foo {
345                /// struct will "consume" the used fields!
346                #[serde(flatten)]
347                flatten_struct: Inner1,
348
349                /// map will keep the unknown fields!
350                #[serde(flatten)]
351                flatten_map1: BTreeMap<String, String>,
352
353                #[serde(flatten)]
354                flatten_map2: BTreeMap<String, String>,
355
356                #[serde(flatten)]
357                flatten_struct2: Inner2,
358            }
359
360            #[derive(Deserialize, Debug)]
361            #[serde(deny_unknown_fields)]
362            struct Inner1 {
363                a: Option<String>,
364                b: Option<String>,
365            }
366            #[derive(Deserialize, Debug)]
367            struct Inner11 {
368                c: Option<String>,
369            }
370            #[derive(Deserialize, Debug)]
371            #[serde(deny_unknown_fields)]
372            struct Inner2 {
373                c: Option<String>,
374            }
375
376            let json = r#"{
377                "a": "b", "c":"d"
378            }"#;
379            let foo2: Result<Foo, _> = serde_json::from_str(json);
380
381            // When there are multiple flatten, all of them will be used.
382            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
383            expect![[r#"
384            Ok(
385                Foo {
386                    flatten_struct: Inner1 {
387                        a: Some(
388                            "b",
389                        ),
390                        b: None,
391                    },
392                    flatten_map1: {
393                        "c": "d",
394                    },
395                    flatten_map2: {
396                        "c": "d",
397                    },
398                    flatten_struct2: Inner2 {
399                        c: Some(
400                            "d",
401                        ),
402                    },
403                },
404            )
405        "#]]
406            .assert_debug_eq(&foo2);
407        }
408
409        #[test]
410        fn test_nested_flatten() {
411            #[derive(Deserialize, Debug)]
412            #[serde(deny_unknown_fields)]
413            struct Outer {
414                #[serde(flatten)]
415                inner: Inner,
416            }
417
418            #[derive(Deserialize, Debug)]
419            struct Inner {
420                a: Option<String>,
421                b: Option<String>,
422                #[serde(flatten)]
423                nested: InnerInner,
424            }
425
426            #[derive(Deserialize, Debug)]
427            struct InnerInner {
428                c: Option<String>,
429            }
430
431            let json = r#"{ "a": "b", "unknown":"1" }"#;
432
433            let foo: Result<Outer, _> = serde_json::from_str(json);
434
435            // This is very unfortunate...
436            expect_test::expect![[r#"
437            Err(
438                Error("unknown field `a`", line: 1, column: 27),
439            )
440        "#]]
441            .assert_debug_eq(&foo);
442
443            // Actually, the nested `flatten` will makes the struct behave like a map.
444            // Let's remove `deny_unknown_fields` and see
445            #[derive(Deserialize, Debug)]
446            struct Outer2 {
447                #[serde(flatten)]
448                inner: Inner,
449                /// We can see the fields of `inner` are not consumed.
450                #[serde(flatten)]
451                map: BTreeMap<String, String>,
452            }
453            let foo2: Result<Outer2, _> = serde_json::from_str(json);
454            expect_test::expect![[r#"
455                Ok(
456                    Outer2 {
457                        inner: Inner {
458                            a: Some(
459                                "b",
460                            ),
461                            b: None,
462                            nested: InnerInner {
463                                c: None,
464                            },
465                        },
466                        map: {
467                            "a": "b",
468                            "unknown": "1",
469                        },
470                    },
471                )
472            "#]]
473            .assert_debug_eq(&foo2);
474        }
475
476        #[test]
477        fn test_flatten_option() {
478            #[derive(Deserialize, Debug)]
479            struct Foo {
480                /// flatten option struct can still consume the field
481                #[serde(flatten)]
482                flatten_struct: Option<Inner1>,
483
484                /// flatten option map is always `Some`
485                #[serde(flatten)]
486                flatten_map1: Option<BTreeMap<String, String>>,
487
488                /// flatten option struct is `None` if the required field is absent
489                #[serde(flatten)]
490                flatten_struct2: Option<Inner2>,
491
492                /// flatten option struct is `Some` if the required field is present and optional field is absent.
493                /// Note: if all fields are optional, the struct is always `Some`
494                #[serde(flatten)]
495                flatten_struct3: Option<Inner3>,
496            }
497
498            #[derive(Deserialize, Debug)]
499            struct Inner1 {
500                a: Option<String>,
501                b: Option<String>,
502            }
503            #[derive(Deserialize, Debug)]
504            struct Inner11 {
505                c: Option<String>,
506            }
507
508            #[derive(Deserialize, Debug)]
509            struct Inner2 {
510                c: Option<String>,
511                d: String,
512            }
513
514            #[derive(Deserialize, Debug)]
515            struct Inner3 {
516                e: Option<String>,
517                f: String,
518            }
519
520            let json = r#"{
521        "a": "b", "c": "d", "f": "g"
522     }"#;
523            let foo: Result<Foo, _> = serde_json::from_str(json);
524            expect![[r#"
525                Ok(
526                    Foo {
527                        flatten_struct: Some(
528                            Inner1 {
529                                a: Some(
530                                    "b",
531                                ),
532                                b: None,
533                            },
534                        ),
535                        flatten_map1: Some(
536                            {
537                                "c": "d",
538                                "f": "g",
539                            },
540                        ),
541                        flatten_struct2: None,
542                        flatten_struct3: Some(
543                            Inner3 {
544                                e: None,
545                                f: "g",
546                            },
547                        ),
548                    },
549                )
550            "#]]
551            .assert_debug_eq(&foo);
552        }
553    }
554}