Skip to main content

risingwave_connector/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(type_alias_impl_trait)]
22#![feature(associated_type_defaults)]
23#![feature(iter_from_coroutine)]
24#![feature(iterator_try_collect)]
25#![feature(try_blocks)]
26#![feature(error_generic_member_access)]
27#![feature(negative_impls)]
28#![feature(register_tool)]
29#![feature(never_type)]
30#![feature(map_try_insert)]
31#![register_tool(rw)]
32#![recursion_limit = "256"]
33#![feature(min_specialization)]
34#![feature(custom_inner_attributes)]
35#![feature(iter_array_chunks)]
36
37use std::time::Duration;
38
39use duration_str::parse_std;
40use serde::de;
41
42pub mod aws_utils;
43
44pub mod allow_alter_on_fly_fields;
45
46mod enforce_secret;
47pub mod error;
48mod macros;
49
50pub mod parser;
51pub mod schema;
52pub mod sink;
53pub mod source;
54
55pub mod connector_common;
56
57pub use paste::paste;
58pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
59
60mod with_options;
61pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
62
63#[cfg(test)]
64mod with_options_test;
65
66pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
67pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
68pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
69pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
70
71pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
72where
73    D: de::Deserializer<'de>,
74{
75    let s: String = de::Deserialize::deserialize(deserializer)?;
76    s.parse().map_err(|_| {
77        de::Error::invalid_value(
78            de::Unexpected::Str(&s),
79            &"integer greater than or equal to 0",
80        )
81    })
82}
83
84pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
85    deserializer: D,
86) -> std::result::Result<Option<Vec<String>>, D::Error>
87where
88    D: de::Deserializer<'de>,
89{
90    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
91    if let Some(s) = s {
92        let s = s.to_ascii_lowercase();
93        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
94        Ok(Some(s))
95    } else {
96        Ok(None)
97    }
98}
99
100pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
101    deserializer: D,
102) -> std::result::Result<Option<Vec<u64>>, D::Error>
103where
104    D: de::Deserializer<'de>,
105{
106    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
107    if let Some(s) = s {
108        let numbers = s
109            .split(',')
110            .map(|s| s.trim().parse())
111            .collect::<Result<Vec<u64>, _>>()
112            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
113        Ok(Some(numbers?))
114    } else {
115        Ok(None)
116    }
117}
118
119pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
120where
121    D: de::Deserializer<'de>,
122{
123    let s: String = de::Deserialize::deserialize(deserializer)?;
124    let s = s.to_ascii_lowercase();
125    match s.as_str() {
126        "true" => Ok(true),
127        "false" => Ok(false),
128        _ => Err(de::Error::invalid_value(
129            de::Unexpected::Str(&s),
130            &"true or false",
131        )),
132    }
133}
134
135pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
136    deserializer: D,
137) -> std::result::Result<Option<bool>, D::Error>
138where
139    D: de::Deserializer<'de>,
140{
141    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
142    if let Some(s) = s {
143        let s = s.to_ascii_lowercase();
144        match s.as_str() {
145            "true" => Ok(Some(true)),
146            "false" => Ok(Some(false)),
147            _ => Err(de::Error::invalid_value(
148                de::Unexpected::Str(&s),
149                &"true or false",
150            )),
151        }
152    } else {
153        Ok(None)
154    }
155}
156
157pub(crate) fn deserialize_duration_from_string<'de, D>(
158    deserializer: D,
159) -> Result<Duration, D::Error>
160where
161    D: de::Deserializer<'de>,
162{
163    let s: String = de::Deserialize::deserialize(deserializer)?;
164    parse_std(&s).map_err(|_| de::Error::invalid_value(
165        de::Unexpected::Str(&s),
166        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
167    ))
168}
169
170pub(crate) fn deserialize_optional_duration_from_string<'de, D>(
171    deserializer: D,
172) -> Result<Option<Duration>, D::Error>
173where
174    D: de::Deserializer<'de>,
175{
176    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
177    if let Some(s) = s {
178        let duration = parse_std(&s).map_err(|_| de::Error::invalid_value(
179            de::Unexpected::Str(&s),
180            &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
181        ))?;
182        Ok(Some(duration))
183    } else {
184        Ok(None)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use expect_test::expect_file;
191
192    use crate::with_options_test::{
193        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
194        generate_with_options_yaml_sink, generate_with_options_yaml_source,
195    };
196
197    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
198    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
199    /// test fails.
200    #[test]
201    fn test_with_options_yaml_up_to_date() {
202        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
203
204        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
205
206        expect_file!("../with_options_connection.yaml")
207            .assert_eq(&generate_with_options_yaml_connection());
208    }
209
210    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
211    #[test]
212    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
213        expect_file!("../src/allow_alter_on_fly_fields.rs")
214            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
215    }
216
217    /// Test some serde behavior we rely on.
218    mod serde {
219        #![expect(dead_code)]
220
221        use std::collections::BTreeMap;
222
223        use expect_test::expect;
224        use serde::Deserialize;
225
226        // test deny_unknown_fields and flatten
227
228        // TL;DR: deny_unknown_fields
229        // - doesn't work with flatten map
230        // - can work with flatten struct
231        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
232
233        #[test]
234        fn test_outer_deny() {
235            #[derive(Deserialize, Debug)]
236            #[serde(deny_unknown_fields)]
237            struct FlattenMap {
238                #[serde(flatten)]
239                flatten: BTreeMap<String, String>,
240            }
241            #[derive(Deserialize, Debug)]
242            #[serde(deny_unknown_fields)]
243            struct FlattenStruct {
244                #[serde(flatten)]
245                flatten_struct: Inner,
246            }
247
248            #[derive(Deserialize, Debug)]
249            #[serde(deny_unknown_fields)]
250            struct FlattenBoth {
251                #[serde(flatten)]
252                flatten: BTreeMap<String, String>,
253                #[serde(flatten)]
254                flatten_struct: Inner,
255            }
256
257            #[derive(Deserialize, Debug)]
258            struct Inner {
259                a: Option<String>,
260                b: Option<String>,
261            }
262
263            let json = r#"{
264                "a": "b"
265            }"#;
266            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
267            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
268            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
269
270            // with `deny_unknown_fields`, we can't flatten ONLY a map
271            expect![[r#"
272                Err(
273                    Error("unknown field `a`", line: 3, column: 13),
274                )
275            "#]]
276            .assert_debug_eq(&foo);
277
278            // but can flatten a struct!
279            expect![[r#"
280                Ok(
281                    FlattenStruct {
282                        flatten_struct: Inner {
283                            a: Some(
284                                "b",
285                            ),
286                            b: None,
287                        },
288                    },
289                )
290            "#]]
291            .assert_debug_eq(&foo1);
292            // unknown fields can be denied.
293            let foo11: Result<FlattenStruct, _> =
294                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
295            expect_test::expect![[r#"
296                Err(
297                    Error("unknown field `unknown`", line: 1, column: 25),
298                )
299            "#]]
300            .assert_debug_eq(&foo11);
301
302            // When both struct and map are flattened, the map also works...
303            expect![[r#"
304                Ok(
305                    FlattenBoth {
306                        flatten: {
307                            "a": "b",
308                        },
309                        flatten_struct: Inner {
310                            a: Some(
311                                "b",
312                            ),
313                            b: None,
314                        },
315                    },
316                )
317            "#]]
318            .assert_debug_eq(&foo2);
319
320            let foo21: Result<FlattenBoth, _> =
321                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
322            expect_test::expect![[r#"
323                Err(
324                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
325                )
326            "#]]
327            .assert_debug_eq(&foo21);
328            // This error above is a little funny, since even if we use string, it will still fail.
329            let foo22: Result<FlattenBoth, _> =
330                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
331            expect_test::expect![[r#"
332                Err(
333                    Error("unknown field `unknown`", line: 1, column: 27),
334                )
335            "#]]
336            .assert_debug_eq(&foo22);
337        }
338
339        #[test]
340        fn test_inner_deny() {
341            // no outer deny now.
342            #[derive(Deserialize, Debug)]
343            struct FlattenStruct {
344                #[serde(flatten)]
345                flatten_struct: Inner,
346            }
347            #[derive(Deserialize, Debug)]
348            #[serde(deny_unknown_fields)]
349            struct Inner {
350                a: Option<String>,
351                b: Option<String>,
352            }
353
354            let json = r#"{
355                "a": "b", "unknown":1
356            }"#;
357            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
358            // unknown fields cannot be denied.
359            // I think this is because `deserialize_struct` is called, and required fields are passed.
360            // Other fields are left for the outer struct to consume.
361            expect_test::expect![[r#"
362                Ok(
363                    FlattenStruct {
364                        flatten_struct: Inner {
365                            a: Some(
366                                "b",
367                            ),
368                            b: None,
369                        },
370                    },
371                )
372            "#]]
373            .assert_debug_eq(&foo);
374        }
375
376        #[test]
377        fn test_multiple_flatten() {
378            #[derive(Deserialize, Debug)]
379            struct Foo {
380                /// struct will "consume" the used fields!
381                #[serde(flatten)]
382                flatten_struct: Inner1,
383
384                /// map will keep the unknown fields!
385                #[serde(flatten)]
386                flatten_map1: BTreeMap<String, String>,
387
388                #[serde(flatten)]
389                flatten_map2: BTreeMap<String, String>,
390
391                #[serde(flatten)]
392                flatten_struct2: Inner2,
393            }
394
395            #[derive(Deserialize, Debug)]
396            #[serde(deny_unknown_fields)]
397            struct Inner1 {
398                a: Option<String>,
399                b: Option<String>,
400            }
401            #[derive(Deserialize, Debug)]
402            struct Inner11 {
403                c: Option<String>,
404            }
405            #[derive(Deserialize, Debug)]
406            #[serde(deny_unknown_fields)]
407            struct Inner2 {
408                c: Option<String>,
409            }
410
411            let json = r#"{
412                "a": "b", "c":"d"
413            }"#;
414            let foo2: Result<Foo, _> = serde_json::from_str(json);
415
416            // When there are multiple flatten, all of them will be used.
417            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
418            expect![[r#"
419            Ok(
420                Foo {
421                    flatten_struct: Inner1 {
422                        a: Some(
423                            "b",
424                        ),
425                        b: None,
426                    },
427                    flatten_map1: {
428                        "c": "d",
429                    },
430                    flatten_map2: {
431                        "c": "d",
432                    },
433                    flatten_struct2: Inner2 {
434                        c: Some(
435                            "d",
436                        ),
437                    },
438                },
439            )
440        "#]]
441            .assert_debug_eq(&foo2);
442        }
443
444        #[test]
445        fn test_nested_flatten() {
446            #[derive(Deserialize, Debug)]
447            #[serde(deny_unknown_fields)]
448            struct Outer {
449                #[serde(flatten)]
450                inner: Inner,
451            }
452
453            #[derive(Deserialize, Debug)]
454            struct Inner {
455                a: Option<String>,
456                b: Option<String>,
457                #[serde(flatten)]
458                nested: InnerInner,
459            }
460
461            #[derive(Deserialize, Debug)]
462            struct InnerInner {
463                c: Option<String>,
464            }
465
466            let json = r#"{ "a": "b", "unknown":"1" }"#;
467
468            let foo: Result<Outer, _> = serde_json::from_str(json);
469
470            // This is very unfortunate...
471            expect_test::expect![[r#"
472            Err(
473                Error("unknown field `a`", line: 1, column: 27),
474            )
475        "#]]
476            .assert_debug_eq(&foo);
477
478            // Actually, the nested `flatten` will makes the struct behave like a map.
479            // Let's remove `deny_unknown_fields` and see
480            #[derive(Deserialize, Debug)]
481            struct Outer2 {
482                #[serde(flatten)]
483                inner: Inner,
484                /// We can see the fields of `inner` are not consumed.
485                #[serde(flatten)]
486                map: BTreeMap<String, String>,
487            }
488            let foo2: Result<Outer2, _> = serde_json::from_str(json);
489            expect_test::expect![[r#"
490                Ok(
491                    Outer2 {
492                        inner: Inner {
493                            a: Some(
494                                "b",
495                            ),
496                            b: None,
497                            nested: InnerInner {
498                                c: None,
499                            },
500                        },
501                        map: {
502                            "a": "b",
503                            "unknown": "1",
504                        },
505                    },
506                )
507            "#]]
508            .assert_debug_eq(&foo2);
509        }
510
511        #[test]
512        fn test_flatten_option() {
513            #[derive(Deserialize, Debug)]
514            struct Foo {
515                /// flatten option struct can still consume the field
516                #[serde(flatten)]
517                flatten_struct: Option<Inner1>,
518
519                /// flatten option map is always `Some`
520                #[serde(flatten)]
521                flatten_map1: Option<BTreeMap<String, String>>,
522
523                /// flatten option struct is `None` if the required field is absent
524                #[serde(flatten)]
525                flatten_struct2: Option<Inner2>,
526
527                /// flatten option struct is `Some` if the required field is present and optional field is absent.
528                /// Note: if all fields are optional, the struct is always `Some`
529                #[serde(flatten)]
530                flatten_struct3: Option<Inner3>,
531            }
532
533            #[derive(Deserialize, Debug)]
534            struct Inner1 {
535                a: Option<String>,
536                b: Option<String>,
537            }
538            #[derive(Deserialize, Debug)]
539            struct Inner11 {
540                c: Option<String>,
541            }
542
543            #[derive(Deserialize, Debug)]
544            struct Inner2 {
545                c: Option<String>,
546                d: String,
547            }
548
549            #[derive(Deserialize, Debug)]
550            struct Inner3 {
551                e: Option<String>,
552                f: String,
553            }
554
555            let json = r#"{
556        "a": "b", "c": "d", "f": "g"
557     }"#;
558            let foo: Result<Foo, _> = serde_json::from_str(json);
559            expect![[r#"
560                Ok(
561                    Foo {
562                        flatten_struct: Some(
563                            Inner1 {
564                                a: Some(
565                                    "b",
566                                ),
567                                b: None,
568                            },
569                        ),
570                        flatten_map1: Some(
571                            {
572                                "c": "d",
573                                "f": "g",
574                            },
575                        ),
576                        flatten_struct2: None,
577                        flatten_struct3: Some(
578                            Inner3 {
579                                e: None,
580                                f: "g",
581                            },
582                        ),
583                    },
584                )
585            "#]]
586            .assert_debug_eq(&foo);
587        }
588    }
589}