risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(box_into_inner)]
23#![feature(type_alias_impl_trait)]
24#![feature(associated_type_defaults)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(iter_from_coroutine)]
27#![feature(if_let_guard)]
28#![feature(iterator_try_collect)]
29#![feature(try_blocks)]
30#![feature(error_generic_member_access)]
31#![feature(negative_impls)]
32#![feature(register_tool)]
33#![feature(assert_matches)]
34#![feature(never_type)]
35#![feature(map_try_insert)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39#![feature(custom_inner_attributes)]
40#![feature(iter_array_chunks)]
41
42use std::time::Duration;
43
44use duration_str::parse_std;
45use serde::de;
46
47pub mod aws_utils;
48
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
73pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
74pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
75
76pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
77where
78    D: de::Deserializer<'de>,
79{
80    let s: String = de::Deserialize::deserialize(deserializer)?;
81    s.parse().map_err(|_| {
82        de::Error::invalid_value(
83            de::Unexpected::Str(&s),
84            &"integer greater than or equal to 0",
85        )
86    })
87}
88
89pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
90    deserializer: D,
91) -> std::result::Result<Option<Vec<String>>, D::Error>
92where
93    D: de::Deserializer<'de>,
94{
95    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
96    if let Some(s) = s {
97        let s = s.to_ascii_lowercase();
98        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
99        Ok(Some(s))
100    } else {
101        Ok(None)
102    }
103}
104
105pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
106    deserializer: D,
107) -> std::result::Result<Option<Vec<u64>>, D::Error>
108where
109    D: de::Deserializer<'de>,
110{
111    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
112    if let Some(s) = s {
113        let numbers = s
114            .split(',')
115            .map(|s| s.trim().parse())
116            .collect::<Result<Vec<u64>, _>>()
117            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
118        Ok(Some(numbers?))
119    } else {
120        Ok(None)
121    }
122}
123
124pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
125where
126    D: de::Deserializer<'de>,
127{
128    let s: String = de::Deserialize::deserialize(deserializer)?;
129    let s = s.to_ascii_lowercase();
130    match s.as_str() {
131        "true" => Ok(true),
132        "false" => Ok(false),
133        _ => Err(de::Error::invalid_value(
134            de::Unexpected::Str(&s),
135            &"true or false",
136        )),
137    }
138}
139
140pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
141    deserializer: D,
142) -> std::result::Result<Option<bool>, D::Error>
143where
144    D: de::Deserializer<'de>,
145{
146    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
147    if let Some(s) = s {
148        let s = s.to_ascii_lowercase();
149        match s.as_str() {
150            "true" => Ok(Some(true)),
151            "false" => Ok(Some(false)),
152            _ => Err(de::Error::invalid_value(
153                de::Unexpected::Str(&s),
154                &"true or false",
155            )),
156        }
157    } else {
158        Ok(None)
159    }
160}
161
162pub(crate) fn deserialize_duration_from_string<'de, D>(
163    deserializer: D,
164) -> Result<Duration, D::Error>
165where
166    D: de::Deserializer<'de>,
167{
168    let s: String = de::Deserialize::deserialize(deserializer)?;
169    parse_std(&s).map_err(|_| de::Error::invalid_value(
170        de::Unexpected::Str(&s),
171        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
172    ))
173}
174
175pub(crate) fn deserialize_optional_duration_from_string<'de, D>(
176    deserializer: D,
177) -> Result<Option<Duration>, D::Error>
178where
179    D: de::Deserializer<'de>,
180{
181    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
182    if let Some(s) = s {
183        let duration = parse_std(&s).map_err(|_| de::Error::invalid_value(
184            de::Unexpected::Str(&s),
185            &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
186        ))?;
187        Ok(Some(duration))
188    } else {
189        Ok(None)
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use expect_test::expect_file;
196
197    use crate::with_options_test::{
198        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
199        generate_with_options_yaml_sink, generate_with_options_yaml_source,
200    };
201
202    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
203    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
204    /// test fails.
205    #[test]
206    fn test_with_options_yaml_up_to_date() {
207        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
208
209        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
210
211        expect_file!("../with_options_connection.yaml")
212            .assert_eq(&generate_with_options_yaml_connection());
213    }
214
215    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
216    #[test]
217    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
218        expect_file!("../src/allow_alter_on_fly_fields.rs")
219            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
220    }
221
222    /// Test some serde behavior we rely on.
223    mod serde {
224        #![expect(dead_code)]
225
226        use std::collections::BTreeMap;
227
228        use expect_test::expect;
229        use serde::Deserialize;
230
231        // test deny_unknown_fields and flatten
232
233        // TL;DR: deny_unknown_fields
234        // - doesn't work with flatten map
235        // - can work with flatten struct
236        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
237
238        #[test]
239        fn test_outer_deny() {
240            #[derive(Deserialize, Debug)]
241            #[serde(deny_unknown_fields)]
242            struct FlattenMap {
243                #[serde(flatten)]
244                flatten: BTreeMap<String, String>,
245            }
246            #[derive(Deserialize, Debug)]
247            #[serde(deny_unknown_fields)]
248            struct FlattenStruct {
249                #[serde(flatten)]
250                flatten_struct: Inner,
251            }
252
253            #[derive(Deserialize, Debug)]
254            #[serde(deny_unknown_fields)]
255            struct FlattenBoth {
256                #[serde(flatten)]
257                flatten: BTreeMap<String, String>,
258                #[serde(flatten)]
259                flatten_struct: Inner,
260            }
261
262            #[derive(Deserialize, Debug)]
263            struct Inner {
264                a: Option<String>,
265                b: Option<String>,
266            }
267
268            let json = r#"{
269                "a": "b"
270            }"#;
271            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
272            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
273            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
274
275            // with `deny_unknown_fields`, we can't flatten ONLY a map
276            expect![[r#"
277                Err(
278                    Error("unknown field `a`", line: 3, column: 13),
279                )
280            "#]]
281            .assert_debug_eq(&foo);
282
283            // but can flatten a struct!
284            expect![[r#"
285                Ok(
286                    FlattenStruct {
287                        flatten_struct: Inner {
288                            a: Some(
289                                "b",
290                            ),
291                            b: None,
292                        },
293                    },
294                )
295            "#]]
296            .assert_debug_eq(&foo1);
297            // unknown fields can be denied.
298            let foo11: Result<FlattenStruct, _> =
299                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
300            expect_test::expect![[r#"
301                Err(
302                    Error("unknown field `unknown`", line: 1, column: 25),
303                )
304            "#]]
305            .assert_debug_eq(&foo11);
306
307            // When both struct and map are flattened, the map also works...
308            expect![[r#"
309                Ok(
310                    FlattenBoth {
311                        flatten: {
312                            "a": "b",
313                        },
314                        flatten_struct: Inner {
315                            a: Some(
316                                "b",
317                            ),
318                            b: None,
319                        },
320                    },
321                )
322            "#]]
323            .assert_debug_eq(&foo2);
324
325            let foo21: Result<FlattenBoth, _> =
326                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
327            expect_test::expect![[r#"
328                Err(
329                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
330                )
331            "#]]
332            .assert_debug_eq(&foo21);
333            // This error above is a little funny, since even if we use string, it will still fail.
334            let foo22: Result<FlattenBoth, _> =
335                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
336            expect_test::expect![[r#"
337                Err(
338                    Error("unknown field `unknown`", line: 1, column: 27),
339                )
340            "#]]
341            .assert_debug_eq(&foo22);
342        }
343
344        #[test]
345        fn test_inner_deny() {
346            // no outer deny now.
347            #[derive(Deserialize, Debug)]
348            struct FlattenStruct {
349                #[serde(flatten)]
350                flatten_struct: Inner,
351            }
352            #[derive(Deserialize, Debug)]
353            #[serde(deny_unknown_fields)]
354            struct Inner {
355                a: Option<String>,
356                b: Option<String>,
357            }
358
359            let json = r#"{
360                "a": "b", "unknown":1
361            }"#;
362            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
363            // unknown fields cannot be denied.
364            // I think this is because `deserialize_struct` is called, and required fields are passed.
365            // Other fields are left for the outer struct to consume.
366            expect_test::expect![[r#"
367                Ok(
368                    FlattenStruct {
369                        flatten_struct: Inner {
370                            a: Some(
371                                "b",
372                            ),
373                            b: None,
374                        },
375                    },
376                )
377            "#]]
378            .assert_debug_eq(&foo);
379        }
380
381        #[test]
382        fn test_multiple_flatten() {
383            #[derive(Deserialize, Debug)]
384            struct Foo {
385                /// struct will "consume" the used fields!
386                #[serde(flatten)]
387                flatten_struct: Inner1,
388
389                /// map will keep the unknown fields!
390                #[serde(flatten)]
391                flatten_map1: BTreeMap<String, String>,
392
393                #[serde(flatten)]
394                flatten_map2: BTreeMap<String, String>,
395
396                #[serde(flatten)]
397                flatten_struct2: Inner2,
398            }
399
400            #[derive(Deserialize, Debug)]
401            #[serde(deny_unknown_fields)]
402            struct Inner1 {
403                a: Option<String>,
404                b: Option<String>,
405            }
406            #[derive(Deserialize, Debug)]
407            struct Inner11 {
408                c: Option<String>,
409            }
410            #[derive(Deserialize, Debug)]
411            #[serde(deny_unknown_fields)]
412            struct Inner2 {
413                c: Option<String>,
414            }
415
416            let json = r#"{
417                "a": "b", "c":"d"
418            }"#;
419            let foo2: Result<Foo, _> = serde_json::from_str(json);
420
421            // When there are multiple flatten, all of them will be used.
422            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
423            expect![[r#"
424            Ok(
425                Foo {
426                    flatten_struct: Inner1 {
427                        a: Some(
428                            "b",
429                        ),
430                        b: None,
431                    },
432                    flatten_map1: {
433                        "c": "d",
434                    },
435                    flatten_map2: {
436                        "c": "d",
437                    },
438                    flatten_struct2: Inner2 {
439                        c: Some(
440                            "d",
441                        ),
442                    },
443                },
444            )
445        "#]]
446            .assert_debug_eq(&foo2);
447        }
448
449        #[test]
450        fn test_nested_flatten() {
451            #[derive(Deserialize, Debug)]
452            #[serde(deny_unknown_fields)]
453            struct Outer {
454                #[serde(flatten)]
455                inner: Inner,
456            }
457
458            #[derive(Deserialize, Debug)]
459            struct Inner {
460                a: Option<String>,
461                b: Option<String>,
462                #[serde(flatten)]
463                nested: InnerInner,
464            }
465
466            #[derive(Deserialize, Debug)]
467            struct InnerInner {
468                c: Option<String>,
469            }
470
471            let json = r#"{ "a": "b", "unknown":"1" }"#;
472
473            let foo: Result<Outer, _> = serde_json::from_str(json);
474
475            // This is very unfortunate...
476            expect_test::expect![[r#"
477            Err(
478                Error("unknown field `a`", line: 1, column: 27),
479            )
480        "#]]
481            .assert_debug_eq(&foo);
482
483            // Actually, the nested `flatten` will makes the struct behave like a map.
484            // Let's remove `deny_unknown_fields` and see
485            #[derive(Deserialize, Debug)]
486            struct Outer2 {
487                #[serde(flatten)]
488                inner: Inner,
489                /// We can see the fields of `inner` are not consumed.
490                #[serde(flatten)]
491                map: BTreeMap<String, String>,
492            }
493            let foo2: Result<Outer2, _> = serde_json::from_str(json);
494            expect_test::expect![[r#"
495                Ok(
496                    Outer2 {
497                        inner: Inner {
498                            a: Some(
499                                "b",
500                            ),
501                            b: None,
502                            nested: InnerInner {
503                                c: None,
504                            },
505                        },
506                        map: {
507                            "a": "b",
508                            "unknown": "1",
509                        },
510                    },
511                )
512            "#]]
513            .assert_debug_eq(&foo2);
514        }
515
516        #[test]
517        fn test_flatten_option() {
518            #[derive(Deserialize, Debug)]
519            struct Foo {
520                /// flatten option struct can still consume the field
521                #[serde(flatten)]
522                flatten_struct: Option<Inner1>,
523
524                /// flatten option map is always `Some`
525                #[serde(flatten)]
526                flatten_map1: Option<BTreeMap<String, String>>,
527
528                /// flatten option struct is `None` if the required field is absent
529                #[serde(flatten)]
530                flatten_struct2: Option<Inner2>,
531
532                /// flatten option struct is `Some` if the required field is present and optional field is absent.
533                /// Note: if all fields are optional, the struct is always `Some`
534                #[serde(flatten)]
535                flatten_struct3: Option<Inner3>,
536            }
537
538            #[derive(Deserialize, Debug)]
539            struct Inner1 {
540                a: Option<String>,
541                b: Option<String>,
542            }
543            #[derive(Deserialize, Debug)]
544            struct Inner11 {
545                c: Option<String>,
546            }
547
548            #[derive(Deserialize, Debug)]
549            struct Inner2 {
550                c: Option<String>,
551                d: String,
552            }
553
554            #[derive(Deserialize, Debug)]
555            struct Inner3 {
556                e: Option<String>,
557                f: String,
558            }
559
560            let json = r#"{
561        "a": "b", "c": "d", "f": "g"
562     }"#;
563            let foo: Result<Foo, _> = serde_json::from_str(json);
564            expect![[r#"
565                Ok(
566                    Foo {
567                        flatten_struct: Some(
568                            Inner1 {
569                                a: Some(
570                                    "b",
571                                ),
572                                b: None,
573                            },
574                        ),
575                        flatten_map1: Some(
576                            {
577                                "c": "d",
578                                "f": "g",
579                            },
580                        ),
581                        flatten_struct2: None,
582                        flatten_struct3: Some(
583                            Inner3 {
584                                e: None,
585                                f: "g",
586                            },
587                        ),
588                    },
589                )
590            "#]]
591            .assert_debug_eq(&foo);
592        }
593    }
594}