risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(array_chunks)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene)]
20#![feature(stmt_expr_attributes)]
21#![feature(box_patterns)]
22#![feature(trait_alias)]
23#![feature(let_chains)]
24#![feature(box_into_inner)]
25#![feature(type_alias_impl_trait)]
26#![feature(associated_type_defaults)]
27#![feature(impl_trait_in_assoc_type)]
28#![feature(iter_from_coroutine)]
29#![feature(if_let_guard)]
30#![feature(iterator_try_collect)]
31#![feature(try_blocks)]
32#![feature(error_generic_member_access)]
33#![feature(negative_impls)]
34#![feature(register_tool)]
35#![feature(assert_matches)]
36#![feature(never_type)]
37#![feature(map_try_insert)]
38#![register_tool(rw)]
39#![recursion_limit = "256"]
40#![feature(min_specialization)]
41#![feature(custom_inner_attributes)]
42
43use std::time::Duration;
44
45use duration_str::parse_std;
46use serde::de;
47
48pub mod aws_utils;
49
50#[rustfmt::skip]
51pub mod allow_alter_on_fly_fields;
52
53mod enforce_secret;
54pub mod error;
55mod macros;
56
57pub mod parser;
58pub mod schema;
59pub mod sink;
60pub mod source;
61
62pub mod connector_common;
63
64pub use paste::paste;
65pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
66
67mod with_options;
68pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
69
70#[cfg(test)]
71mod with_options_test;
72
73pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
74pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
75pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
76pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
77
78pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
79where
80    D: de::Deserializer<'de>,
81{
82    let s: String = de::Deserialize::deserialize(deserializer)?;
83    s.parse().map_err(|_| {
84        de::Error::invalid_value(
85            de::Unexpected::Str(&s),
86            &"integer greater than or equal to 0",
87        )
88    })
89}
90
91pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
92    deserializer: D,
93) -> std::result::Result<Option<Vec<String>>, D::Error>
94where
95    D: de::Deserializer<'de>,
96{
97    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
98    if let Some(s) = s {
99        let s = s.to_ascii_lowercase();
100        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
101        Ok(Some(s))
102    } else {
103        Ok(None)
104    }
105}
106
107pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
108    deserializer: D,
109) -> std::result::Result<Option<Vec<u64>>, D::Error>
110where
111    D: de::Deserializer<'de>,
112{
113    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
114    if let Some(s) = s {
115        let numbers = s
116            .split(',')
117            .map(|s| s.trim().parse())
118            .collect::<Result<Vec<u64>, _>>()
119            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
120        Ok(Some(numbers?))
121    } else {
122        Ok(None)
123    }
124}
125
126pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
127where
128    D: de::Deserializer<'de>,
129{
130    let s: String = de::Deserialize::deserialize(deserializer)?;
131    let s = s.to_ascii_lowercase();
132    match s.as_str() {
133        "true" => Ok(true),
134        "false" => Ok(false),
135        _ => Err(de::Error::invalid_value(
136            de::Unexpected::Str(&s),
137            &"true or false",
138        )),
139    }
140}
141
142pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
143    deserializer: D,
144) -> std::result::Result<Option<bool>, D::Error>
145where
146    D: de::Deserializer<'de>,
147{
148    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
149    if let Some(s) = s {
150        let s = s.to_ascii_lowercase();
151        match s.as_str() {
152            "true" => Ok(Some(true)),
153            "false" => Ok(Some(false)),
154            _ => Err(de::Error::invalid_value(
155                de::Unexpected::Str(&s),
156                &"true or false",
157            )),
158        }
159    } else {
160        Ok(None)
161    }
162}
163
164pub(crate) fn deserialize_duration_from_string<'de, D>(
165    deserializer: D,
166) -> Result<Duration, D::Error>
167where
168    D: de::Deserializer<'de>,
169{
170    let s: String = de::Deserialize::deserialize(deserializer)?;
171    parse_std(&s).map_err(|_| de::Error::invalid_value(
172        de::Unexpected::Str(&s),
173        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
174    ))
175}
176
177#[cfg(test)]
178mod tests {
179    use expect_test::expect_file;
180
181    use crate::with_options_test::{
182        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
183        generate_with_options_yaml_sink, generate_with_options_yaml_source,
184    };
185
186    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
187    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
188    /// test fails.
189    #[test]
190    fn test_with_options_yaml_up_to_date() {
191        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
192
193        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
194
195        expect_file!("../with_options_connection.yaml")
196            .assert_eq(&generate_with_options_yaml_connection());
197    }
198
199    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
200    #[test]
201    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
202        expect_file!("../src/allow_alter_on_fly_fields.rs")
203            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
204    }
205
206    /// Test some serde behavior we rely on.
207    mod serde {
208        #![expect(dead_code)]
209
210        use std::collections::BTreeMap;
211
212        use expect_test::expect;
213        use serde::Deserialize;
214
215        // test deny_unknown_fields and flatten
216
217        // TL;DR: deny_unknown_fields
218        // - doesn't work with flatten map
219        // - can work with flatten struct
220        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
221
222        #[test]
223        fn test_outer_deny() {
224            #[derive(Deserialize, Debug)]
225            #[serde(deny_unknown_fields)]
226            struct FlattenMap {
227                #[serde(flatten)]
228                flatten: BTreeMap<String, String>,
229            }
230            #[derive(Deserialize, Debug)]
231            #[serde(deny_unknown_fields)]
232            struct FlattenStruct {
233                #[serde(flatten)]
234                flatten_struct: Inner,
235            }
236
237            #[derive(Deserialize, Debug)]
238            #[serde(deny_unknown_fields)]
239            struct FlattenBoth {
240                #[serde(flatten)]
241                flatten: BTreeMap<String, String>,
242                #[serde(flatten)]
243                flatten_struct: Inner,
244            }
245
246            #[derive(Deserialize, Debug)]
247            struct Inner {
248                a: Option<String>,
249                b: Option<String>,
250            }
251
252            let json = r#"{
253                "a": "b"
254            }"#;
255            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
256            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
257            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
258
259            // with `deny_unknown_fields`, we can't flatten ONLY a map
260            expect![[r#"
261                Err(
262                    Error("unknown field `a`", line: 3, column: 13),
263                )
264            "#]]
265            .assert_debug_eq(&foo);
266
267            // but can flatten a struct!
268            expect![[r#"
269                Ok(
270                    FlattenStruct {
271                        flatten_struct: Inner {
272                            a: Some(
273                                "b",
274                            ),
275                            b: None,
276                        },
277                    },
278                )
279            "#]]
280            .assert_debug_eq(&foo1);
281            // unknown fields can be denied.
282            let foo11: Result<FlattenStruct, _> =
283                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
284            expect_test::expect![[r#"
285                Err(
286                    Error("unknown field `unknown`", line: 1, column: 25),
287                )
288            "#]]
289            .assert_debug_eq(&foo11);
290
291            // When both struct and map are flattened, the map also works...
292            expect![[r#"
293                Ok(
294                    FlattenBoth {
295                        flatten: {
296                            "a": "b",
297                        },
298                        flatten_struct: Inner {
299                            a: Some(
300                                "b",
301                            ),
302                            b: None,
303                        },
304                    },
305                )
306            "#]]
307            .assert_debug_eq(&foo2);
308
309            let foo21: Result<FlattenBoth, _> =
310                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
311            expect_test::expect![[r#"
312                Err(
313                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
314                )
315            "#]]
316            .assert_debug_eq(&foo21);
317            // This error above is a little funny, since even if we use string, it will still fail.
318            let foo22: Result<FlattenBoth, _> =
319                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
320            expect_test::expect![[r#"
321                Err(
322                    Error("unknown field `unknown`", line: 1, column: 27),
323                )
324            "#]]
325            .assert_debug_eq(&foo22);
326        }
327
328        #[test]
329        fn test_inner_deny() {
330            // no outer deny now.
331            #[derive(Deserialize, Debug)]
332            struct FlattenStruct {
333                #[serde(flatten)]
334                flatten_struct: Inner,
335            }
336            #[derive(Deserialize, Debug)]
337            #[serde(deny_unknown_fields)]
338            struct Inner {
339                a: Option<String>,
340                b: Option<String>,
341            }
342
343            let json = r#"{
344                "a": "b", "unknown":1
345            }"#;
346            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
347            // unknown fields cannot be denied.
348            // I think this is because `deserialize_struct` is called, and required fields are passed.
349            // Other fields are left for the outer struct to consume.
350            expect_test::expect![[r#"
351                Ok(
352                    FlattenStruct {
353                        flatten_struct: Inner {
354                            a: Some(
355                                "b",
356                            ),
357                            b: None,
358                        },
359                    },
360                )
361            "#]]
362            .assert_debug_eq(&foo);
363        }
364
365        #[test]
366        fn test_multiple_flatten() {
367            #[derive(Deserialize, Debug)]
368            struct Foo {
369                /// struct will "consume" the used fields!
370                #[serde(flatten)]
371                flatten_struct: Inner1,
372
373                /// map will keep the unknown fields!
374                #[serde(flatten)]
375                flatten_map1: BTreeMap<String, String>,
376
377                #[serde(flatten)]
378                flatten_map2: BTreeMap<String, String>,
379
380                #[serde(flatten)]
381                flatten_struct2: Inner2,
382            }
383
384            #[derive(Deserialize, Debug)]
385            #[serde(deny_unknown_fields)]
386            struct Inner1 {
387                a: Option<String>,
388                b: Option<String>,
389            }
390            #[derive(Deserialize, Debug)]
391            struct Inner11 {
392                c: Option<String>,
393            }
394            #[derive(Deserialize, Debug)]
395            #[serde(deny_unknown_fields)]
396            struct Inner2 {
397                c: Option<String>,
398            }
399
400            let json = r#"{
401                "a": "b", "c":"d"
402            }"#;
403            let foo2: Result<Foo, _> = serde_json::from_str(json);
404
405            // When there are multiple flatten, all of them will be used.
406            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
407            expect![[r#"
408            Ok(
409                Foo {
410                    flatten_struct: Inner1 {
411                        a: Some(
412                            "b",
413                        ),
414                        b: None,
415                    },
416                    flatten_map1: {
417                        "c": "d",
418                    },
419                    flatten_map2: {
420                        "c": "d",
421                    },
422                    flatten_struct2: Inner2 {
423                        c: Some(
424                            "d",
425                        ),
426                    },
427                },
428            )
429        "#]]
430            .assert_debug_eq(&foo2);
431        }
432
433        #[test]
434        fn test_nested_flatten() {
435            #[derive(Deserialize, Debug)]
436            #[serde(deny_unknown_fields)]
437            struct Outer {
438                #[serde(flatten)]
439                inner: Inner,
440            }
441
442            #[derive(Deserialize, Debug)]
443            struct Inner {
444                a: Option<String>,
445                b: Option<String>,
446                #[serde(flatten)]
447                nested: InnerInner,
448            }
449
450            #[derive(Deserialize, Debug)]
451            struct InnerInner {
452                c: Option<String>,
453            }
454
455            let json = r#"{ "a": "b", "unknown":"1" }"#;
456
457            let foo: Result<Outer, _> = serde_json::from_str(json);
458
459            // This is very unfortunate...
460            expect_test::expect![[r#"
461            Err(
462                Error("unknown field `a`", line: 1, column: 27),
463            )
464        "#]]
465            .assert_debug_eq(&foo);
466
467            // Actually, the nested `flatten` will makes the struct behave like a map.
468            // Let's remove `deny_unknown_fields` and see
469            #[derive(Deserialize, Debug)]
470            struct Outer2 {
471                #[serde(flatten)]
472                inner: Inner,
473                /// We can see the fields of `inner` are not consumed.
474                #[serde(flatten)]
475                map: BTreeMap<String, String>,
476            }
477            let foo2: Result<Outer2, _> = serde_json::from_str(json);
478            expect_test::expect![[r#"
479                Ok(
480                    Outer2 {
481                        inner: Inner {
482                            a: Some(
483                                "b",
484                            ),
485                            b: None,
486                            nested: InnerInner {
487                                c: None,
488                            },
489                        },
490                        map: {
491                            "a": "b",
492                            "unknown": "1",
493                        },
494                    },
495                )
496            "#]]
497            .assert_debug_eq(&foo2);
498        }
499
500        #[test]
501        fn test_flatten_option() {
502            #[derive(Deserialize, Debug)]
503            struct Foo {
504                /// flatten option struct can still consume the field
505                #[serde(flatten)]
506                flatten_struct: Option<Inner1>,
507
508                /// flatten option map is always `Some`
509                #[serde(flatten)]
510                flatten_map1: Option<BTreeMap<String, String>>,
511
512                /// flatten option struct is `None` if the required field is absent
513                #[serde(flatten)]
514                flatten_struct2: Option<Inner2>,
515
516                /// flatten option struct is `Some` if the required field is present and optional field is absent.
517                /// Note: if all fields are optional, the struct is always `Some`
518                #[serde(flatten)]
519                flatten_struct3: Option<Inner3>,
520            }
521
522            #[derive(Deserialize, Debug)]
523            struct Inner1 {
524                a: Option<String>,
525                b: Option<String>,
526            }
527            #[derive(Deserialize, Debug)]
528            struct Inner11 {
529                c: Option<String>,
530            }
531
532            #[derive(Deserialize, Debug)]
533            struct Inner2 {
534                c: Option<String>,
535                d: String,
536            }
537
538            #[derive(Deserialize, Debug)]
539            struct Inner3 {
540                e: Option<String>,
541                f: String,
542            }
543
544            let json = r#"{
545        "a": "b", "c": "d", "f": "g"
546     }"#;
547            let foo: Result<Foo, _> = serde_json::from_str(json);
548            expect![[r#"
549                Ok(
550                    Foo {
551                        flatten_struct: Some(
552                            Inner1 {
553                                a: Some(
554                                    "b",
555                                ),
556                                b: None,
557                            },
558                        ),
559                        flatten_map1: Some(
560                            {
561                                "c": "d",
562                                "f": "g",
563                            },
564                        ),
565                        flatten_struct2: None,
566                        flatten_struct3: Some(
567                            Inner3 {
568                                e: None,
569                                f: "g",
570                            },
571                        ),
572                    },
573                )
574            "#]]
575            .assert_debug_eq(&foo);
576        }
577    }
578}