risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![feature(map_try_insert)]
37#![register_tool(rw)]
38#![recursion_limit = "256"]
39#![feature(min_specialization)]
40
41use std::time::Duration;
42
43use duration_str::parse_std;
44use serde::de;
45
46pub mod aws_utils;
47
48#[rustfmt::skip]
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
72where
73    D: de::Deserializer<'de>,
74{
75    let s: String = de::Deserialize::deserialize(deserializer)?;
76    s.parse().map_err(|_| {
77        de::Error::invalid_value(
78            de::Unexpected::Str(&s),
79            &"integer greater than or equal to 0",
80        )
81    })
82}
83
84pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
85    deserializer: D,
86) -> std::result::Result<Option<Vec<String>>, D::Error>
87where
88    D: de::Deserializer<'de>,
89{
90    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
91    if let Some(s) = s {
92        let s = s.to_ascii_lowercase();
93        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
94        Ok(Some(s))
95    } else {
96        Ok(None)
97    }
98}
99
100pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
101    deserializer: D,
102) -> std::result::Result<Option<Vec<u64>>, D::Error>
103where
104    D: de::Deserializer<'de>,
105{
106    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
107    if let Some(s) = s {
108        let numbers = s
109            .split(',')
110            .map(|s| s.trim().parse())
111            .collect::<Result<Vec<u64>, _>>()
112            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
113        Ok(Some(numbers?))
114    } else {
115        Ok(None)
116    }
117}
118
119pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
120where
121    D: de::Deserializer<'de>,
122{
123    let s: String = de::Deserialize::deserialize(deserializer)?;
124    let s = s.to_ascii_lowercase();
125    match s.as_str() {
126        "true" => Ok(true),
127        "false" => Ok(false),
128        _ => Err(de::Error::invalid_value(
129            de::Unexpected::Str(&s),
130            &"true or false",
131        )),
132    }
133}
134
135pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
136    deserializer: D,
137) -> std::result::Result<Option<bool>, D::Error>
138where
139    D: de::Deserializer<'de>,
140{
141    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
142    if let Some(s) = s {
143        let s = s.to_ascii_lowercase();
144        match s.as_str() {
145            "true" => Ok(Some(true)),
146            "false" => Ok(Some(false)),
147            _ => Err(de::Error::invalid_value(
148                de::Unexpected::Str(&s),
149                &"true or false",
150            )),
151        }
152    } else {
153        Ok(None)
154    }
155}
156
157pub(crate) fn deserialize_duration_from_string<'de, D>(
158    deserializer: D,
159) -> Result<Duration, D::Error>
160where
161    D: de::Deserializer<'de>,
162{
163    let s: String = de::Deserialize::deserialize(deserializer)?;
164    parse_std(&s).map_err(|_| de::Error::invalid_value(
165        de::Unexpected::Str(&s),
166        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
167    ))
168}
169
170#[cfg(test)]
171mod tests {
172    use expect_test::expect_file;
173
174    use crate::with_options_test::{
175        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_sink,
176        generate_with_options_yaml_source,
177    };
178
179    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
180    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
181    /// test fails.
182    #[test]
183    fn test_with_options_yaml_up_to_date() {
184        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
185
186        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
187    }
188
189    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
190    #[test]
191    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
192        expect_file!("../src/allow_alter_on_fly_fields.rs")
193            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
194    }
195
196    /// Test some serde behavior we rely on.
197    mod serde {
198        #![expect(dead_code)]
199
200        use std::collections::BTreeMap;
201
202        use expect_test::expect;
203        use serde::Deserialize;
204
205        // test deny_unknown_fields and flatten
206
207        // TL;DR: deny_unknown_fields
208        // - doesn't work with flatten map
209        // - can work with flatten struct
210        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
211
212        #[test]
213        fn test_outer_deny() {
214            #[derive(Deserialize, Debug)]
215            #[serde(deny_unknown_fields)]
216            struct FlattenMap {
217                #[serde(flatten)]
218                flatten: BTreeMap<String, String>,
219            }
220            #[derive(Deserialize, Debug)]
221            #[serde(deny_unknown_fields)]
222            struct FlattenStruct {
223                #[serde(flatten)]
224                flatten_struct: Inner,
225            }
226
227            #[derive(Deserialize, Debug)]
228            #[serde(deny_unknown_fields)]
229            struct FlattenBoth {
230                #[serde(flatten)]
231                flatten: BTreeMap<String, String>,
232                #[serde(flatten)]
233                flatten_struct: Inner,
234            }
235
236            #[derive(Deserialize, Debug)]
237            struct Inner {
238                a: Option<String>,
239                b: Option<String>,
240            }
241
242            let json = r#"{
243                "a": "b"
244            }"#;
245            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
246            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
247            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
248
249            // with `deny_unknown_fields`, we can't flatten ONLY a map
250            expect![[r#"
251                Err(
252                    Error("unknown field `a`", line: 3, column: 13),
253                )
254            "#]]
255            .assert_debug_eq(&foo);
256
257            // but can flatten a struct!
258            expect![[r#"
259                Ok(
260                    FlattenStruct {
261                        flatten_struct: Inner {
262                            a: Some(
263                                "b",
264                            ),
265                            b: None,
266                        },
267                    },
268                )
269            "#]]
270            .assert_debug_eq(&foo1);
271            // unknown fields can be denied.
272            let foo11: Result<FlattenStruct, _> =
273                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
274            expect_test::expect![[r#"
275                Err(
276                    Error("unknown field `unknown`", line: 1, column: 25),
277                )
278            "#]]
279            .assert_debug_eq(&foo11);
280
281            // When both struct and map are flattened, the map also works...
282            expect![[r#"
283                Ok(
284                    FlattenBoth {
285                        flatten: {
286                            "a": "b",
287                        },
288                        flatten_struct: Inner {
289                            a: Some(
290                                "b",
291                            ),
292                            b: None,
293                        },
294                    },
295                )
296            "#]]
297            .assert_debug_eq(&foo2);
298
299            let foo21: Result<FlattenBoth, _> =
300                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
301            expect_test::expect![[r#"
302                Err(
303                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
304                )
305            "#]]
306            .assert_debug_eq(&foo21);
307            // This error above is a little funny, since even if we use string, it will still fail.
308            let foo22: Result<FlattenBoth, _> =
309                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
310            expect_test::expect![[r#"
311                Err(
312                    Error("unknown field `unknown`", line: 1, column: 27),
313                )
314            "#]]
315            .assert_debug_eq(&foo22);
316        }
317
318        #[test]
319        fn test_inner_deny() {
320            // no outer deny now.
321            #[derive(Deserialize, Debug)]
322            struct FlattenStruct {
323                #[serde(flatten)]
324                flatten_struct: Inner,
325            }
326            #[derive(Deserialize, Debug)]
327            #[serde(deny_unknown_fields)]
328            struct Inner {
329                a: Option<String>,
330                b: Option<String>,
331            }
332
333            let json = r#"{
334                "a": "b", "unknown":1
335            }"#;
336            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
337            // unknown fields cannot be denied.
338            // I think this is because `deserialize_struct` is called, and required fields are passed.
339            // Other fields are left for the outer struct to consume.
340            expect_test::expect![[r#"
341                Ok(
342                    FlattenStruct {
343                        flatten_struct: Inner {
344                            a: Some(
345                                "b",
346                            ),
347                            b: None,
348                        },
349                    },
350                )
351            "#]]
352            .assert_debug_eq(&foo);
353        }
354
355        #[test]
356        fn test_multiple_flatten() {
357            #[derive(Deserialize, Debug)]
358            struct Foo {
359                /// struct will "consume" the used fields!
360                #[serde(flatten)]
361                flatten_struct: Inner1,
362
363                /// map will keep the unknown fields!
364                #[serde(flatten)]
365                flatten_map1: BTreeMap<String, String>,
366
367                #[serde(flatten)]
368                flatten_map2: BTreeMap<String, String>,
369
370                #[serde(flatten)]
371                flatten_struct2: Inner2,
372            }
373
374            #[derive(Deserialize, Debug)]
375            #[serde(deny_unknown_fields)]
376            struct Inner1 {
377                a: Option<String>,
378                b: Option<String>,
379            }
380            #[derive(Deserialize, Debug)]
381            struct Inner11 {
382                c: Option<String>,
383            }
384            #[derive(Deserialize, Debug)]
385            #[serde(deny_unknown_fields)]
386            struct Inner2 {
387                c: Option<String>,
388            }
389
390            let json = r#"{
391                "a": "b", "c":"d"
392            }"#;
393            let foo2: Result<Foo, _> = serde_json::from_str(json);
394
395            // When there are multiple flatten, all of them will be used.
396            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
397            expect![[r#"
398            Ok(
399                Foo {
400                    flatten_struct: Inner1 {
401                        a: Some(
402                            "b",
403                        ),
404                        b: None,
405                    },
406                    flatten_map1: {
407                        "c": "d",
408                    },
409                    flatten_map2: {
410                        "c": "d",
411                    },
412                    flatten_struct2: Inner2 {
413                        c: Some(
414                            "d",
415                        ),
416                    },
417                },
418            )
419        "#]]
420            .assert_debug_eq(&foo2);
421        }
422
423        #[test]
424        fn test_nested_flatten() {
425            #[derive(Deserialize, Debug)]
426            #[serde(deny_unknown_fields)]
427            struct Outer {
428                #[serde(flatten)]
429                inner: Inner,
430            }
431
432            #[derive(Deserialize, Debug)]
433            struct Inner {
434                a: Option<String>,
435                b: Option<String>,
436                #[serde(flatten)]
437                nested: InnerInner,
438            }
439
440            #[derive(Deserialize, Debug)]
441            struct InnerInner {
442                c: Option<String>,
443            }
444
445            let json = r#"{ "a": "b", "unknown":"1" }"#;
446
447            let foo: Result<Outer, _> = serde_json::from_str(json);
448
449            // This is very unfortunate...
450            expect_test::expect![[r#"
451            Err(
452                Error("unknown field `a`", line: 1, column: 27),
453            )
454        "#]]
455            .assert_debug_eq(&foo);
456
457            // Actually, the nested `flatten` will makes the struct behave like a map.
458            // Let's remove `deny_unknown_fields` and see
459            #[derive(Deserialize, Debug)]
460            struct Outer2 {
461                #[serde(flatten)]
462                inner: Inner,
463                /// We can see the fields of `inner` are not consumed.
464                #[serde(flatten)]
465                map: BTreeMap<String, String>,
466            }
467            let foo2: Result<Outer2, _> = serde_json::from_str(json);
468            expect_test::expect![[r#"
469                Ok(
470                    Outer2 {
471                        inner: Inner {
472                            a: Some(
473                                "b",
474                            ),
475                            b: None,
476                            nested: InnerInner {
477                                c: None,
478                            },
479                        },
480                        map: {
481                            "a": "b",
482                            "unknown": "1",
483                        },
484                    },
485                )
486            "#]]
487            .assert_debug_eq(&foo2);
488        }
489
490        #[test]
491        fn test_flatten_option() {
492            #[derive(Deserialize, Debug)]
493            struct Foo {
494                /// flatten option struct can still consume the field
495                #[serde(flatten)]
496                flatten_struct: Option<Inner1>,
497
498                /// flatten option map is always `Some`
499                #[serde(flatten)]
500                flatten_map1: Option<BTreeMap<String, String>>,
501
502                /// flatten option struct is `None` if the required field is absent
503                #[serde(flatten)]
504                flatten_struct2: Option<Inner2>,
505
506                /// flatten option struct is `Some` if the required field is present and optional field is absent.
507                /// Note: if all fields are optional, the struct is always `Some`
508                #[serde(flatten)]
509                flatten_struct3: Option<Inner3>,
510            }
511
512            #[derive(Deserialize, Debug)]
513            struct Inner1 {
514                a: Option<String>,
515                b: Option<String>,
516            }
517            #[derive(Deserialize, Debug)]
518            struct Inner11 {
519                c: Option<String>,
520            }
521
522            #[derive(Deserialize, Debug)]
523            struct Inner2 {
524                c: Option<String>,
525                d: String,
526            }
527
528            #[derive(Deserialize, Debug)]
529            struct Inner3 {
530                e: Option<String>,
531                f: String,
532            }
533
534            let json = r#"{
535        "a": "b", "c": "d", "f": "g"
536     }"#;
537            let foo: Result<Foo, _> = serde_json::from_str(json);
538            expect![[r#"
539                Ok(
540                    Foo {
541                        flatten_struct: Some(
542                            Inner1 {
543                                a: Some(
544                                    "b",
545                                ),
546                                b: None,
547                            },
548                        ),
549                        flatten_map1: Some(
550                            {
551                                "c": "d",
552                                "f": "g",
553                            },
554                        ),
555                        flatten_struct2: None,
556                        flatten_struct3: Some(
557                            Inner3 {
558                                e: None,
559                                f: "g",
560                            },
561                        ),
562                    },
563                )
564            "#]]
565            .assert_debug_eq(&foo);
566        }
567    }
568}