risingwave_connector/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(box_into_inner)]
23#![feature(type_alias_impl_trait)]
24#![feature(associated_type_defaults)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(iter_from_coroutine)]
27#![feature(if_let_guard)]
28#![feature(iterator_try_collect)]
29#![feature(try_blocks)]
30#![feature(error_generic_member_access)]
31#![feature(negative_impls)]
32#![feature(register_tool)]
33#![feature(assert_matches)]
34#![feature(never_type)]
35#![feature(map_try_insert)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39#![feature(custom_inner_attributes)]
40#![feature(iter_array_chunks)]
41
42use std::time::Duration;
43
44use duration_str::parse_std;
45use serde::de;
46
47pub mod aws_utils;
48
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
73pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
74pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
75
76pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
77where
78    D: de::Deserializer<'de>,
79{
80    let s: String = de::Deserialize::deserialize(deserializer)?;
81    s.parse().map_err(|_| {
82        de::Error::invalid_value(
83            de::Unexpected::Str(&s),
84            &"integer greater than or equal to 0",
85        )
86    })
87}
88
89pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
90    deserializer: D,
91) -> std::result::Result<Option<Vec<String>>, D::Error>
92where
93    D: de::Deserializer<'de>,
94{
95    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
96    if let Some(s) = s {
97        let s = s.to_ascii_lowercase();
98        let s = s.split(',').map(|s| s.trim().to_owned()).collect();
99        Ok(Some(s))
100    } else {
101        Ok(None)
102    }
103}
104
105pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
106    deserializer: D,
107) -> std::result::Result<Option<Vec<u64>>, D::Error>
108where
109    D: de::Deserializer<'de>,
110{
111    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
112    if let Some(s) = s {
113        let numbers = s
114            .split(',')
115            .map(|s| s.trim().parse())
116            .collect::<Result<Vec<u64>, _>>()
117            .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
118        Ok(Some(numbers?))
119    } else {
120        Ok(None)
121    }
122}
123
124pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
125where
126    D: de::Deserializer<'de>,
127{
128    let s: String = de::Deserialize::deserialize(deserializer)?;
129    let s = s.to_ascii_lowercase();
130    match s.as_str() {
131        "true" => Ok(true),
132        "false" => Ok(false),
133        _ => Err(de::Error::invalid_value(
134            de::Unexpected::Str(&s),
135            &"true or false",
136        )),
137    }
138}
139
140pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
141    deserializer: D,
142) -> std::result::Result<Option<bool>, D::Error>
143where
144    D: de::Deserializer<'de>,
145{
146    let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
147    if let Some(s) = s {
148        let s = s.to_ascii_lowercase();
149        match s.as_str() {
150            "true" => Ok(Some(true)),
151            "false" => Ok(Some(false)),
152            _ => Err(de::Error::invalid_value(
153                de::Unexpected::Str(&s),
154                &"true or false",
155            )),
156        }
157    } else {
158        Ok(None)
159    }
160}
161
162pub(crate) fn deserialize_duration_from_string<'de, D>(
163    deserializer: D,
164) -> Result<Duration, D::Error>
165where
166    D: de::Deserializer<'de>,
167{
168    let s: String = de::Deserialize::deserialize(deserializer)?;
169    parse_std(&s).map_err(|_| de::Error::invalid_value(
170        de::Unexpected::Str(&s),
171        &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
172    ))
173}
174
175#[cfg(test)]
176mod tests {
177    use expect_test::expect_file;
178
179    use crate::with_options_test::{
180        generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
181        generate_with_options_yaml_sink, generate_with_options_yaml_source,
182    };
183
184    /// This test ensures that `src/connector/with_options.yaml` is up-to-date with the default values specified
185    /// in this file. Developer should run `./risedev generate-with-options` to update it if this
186    /// test fails.
187    #[test]
188    fn test_with_options_yaml_up_to_date() {
189        expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
190
191        expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
192
193        expect_file!("../with_options_connection.yaml")
194            .assert_eq(&generate_with_options_yaml_connection());
195    }
196
197    /// This test ensures that the `allow_alter_on_fly` fields Rust file is up-to-date.
198    #[test]
199    fn test_allow_alter_on_fly_fields_rust_up_to_date() {
200        expect_file!("../src/allow_alter_on_fly_fields.rs")
201            .assert_eq(&generate_allow_alter_on_fly_fields_combined());
202    }
203
204    /// Test some serde behavior we rely on.
205    mod serde {
206        #![expect(dead_code)]
207
208        use std::collections::BTreeMap;
209
210        use expect_test::expect;
211        use serde::Deserialize;
212
213        // test deny_unknown_fields and flatten
214
215        // TL;DR: deny_unknown_fields
216        // - doesn't work with flatten map
217        // - can work with flatten struct
218        // - doesn't work with nested flatten struct (This makes a flatten struct behave like a flatten map)
219
220        #[test]
221        fn test_outer_deny() {
222            #[derive(Deserialize, Debug)]
223            #[serde(deny_unknown_fields)]
224            struct FlattenMap {
225                #[serde(flatten)]
226                flatten: BTreeMap<String, String>,
227            }
228            #[derive(Deserialize, Debug)]
229            #[serde(deny_unknown_fields)]
230            struct FlattenStruct {
231                #[serde(flatten)]
232                flatten_struct: Inner,
233            }
234
235            #[derive(Deserialize, Debug)]
236            #[serde(deny_unknown_fields)]
237            struct FlattenBoth {
238                #[serde(flatten)]
239                flatten: BTreeMap<String, String>,
240                #[serde(flatten)]
241                flatten_struct: Inner,
242            }
243
244            #[derive(Deserialize, Debug)]
245            struct Inner {
246                a: Option<String>,
247                b: Option<String>,
248            }
249
250            let json = r#"{
251                "a": "b"
252            }"#;
253            let foo: Result<FlattenMap, _> = serde_json::from_str(json);
254            let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
255            let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
256
257            // with `deny_unknown_fields`, we can't flatten ONLY a map
258            expect![[r#"
259                Err(
260                    Error("unknown field `a`", line: 3, column: 13),
261                )
262            "#]]
263            .assert_debug_eq(&foo);
264
265            // but can flatten a struct!
266            expect![[r#"
267                Ok(
268                    FlattenStruct {
269                        flatten_struct: Inner {
270                            a: Some(
271                                "b",
272                            ),
273                            b: None,
274                        },
275                    },
276                )
277            "#]]
278            .assert_debug_eq(&foo1);
279            // unknown fields can be denied.
280            let foo11: Result<FlattenStruct, _> =
281                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
282            expect_test::expect![[r#"
283                Err(
284                    Error("unknown field `unknown`", line: 1, column: 25),
285                )
286            "#]]
287            .assert_debug_eq(&foo11);
288
289            // When both struct and map are flattened, the map also works...
290            expect![[r#"
291                Ok(
292                    FlattenBoth {
293                        flatten: {
294                            "a": "b",
295                        },
296                        flatten_struct: Inner {
297                            a: Some(
298                                "b",
299                            ),
300                            b: None,
301                        },
302                    },
303                )
304            "#]]
305            .assert_debug_eq(&foo2);
306
307            let foo21: Result<FlattenBoth, _> =
308                serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
309            expect_test::expect![[r#"
310                Err(
311                    Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
312                )
313            "#]]
314            .assert_debug_eq(&foo21);
315            // This error above is a little funny, since even if we use string, it will still fail.
316            let foo22: Result<FlattenBoth, _> =
317                serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
318            expect_test::expect![[r#"
319                Err(
320                    Error("unknown field `unknown`", line: 1, column: 27),
321                )
322            "#]]
323            .assert_debug_eq(&foo22);
324        }
325
326        #[test]
327        fn test_inner_deny() {
328            // no outer deny now.
329            #[derive(Deserialize, Debug)]
330            struct FlattenStruct {
331                #[serde(flatten)]
332                flatten_struct: Inner,
333            }
334            #[derive(Deserialize, Debug)]
335            #[serde(deny_unknown_fields)]
336            struct Inner {
337                a: Option<String>,
338                b: Option<String>,
339            }
340
341            let json = r#"{
342                "a": "b", "unknown":1
343            }"#;
344            let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
345            // unknown fields cannot be denied.
346            // I think this is because `deserialize_struct` is called, and required fields are passed.
347            // Other fields are left for the outer struct to consume.
348            expect_test::expect![[r#"
349                Ok(
350                    FlattenStruct {
351                        flatten_struct: Inner {
352                            a: Some(
353                                "b",
354                            ),
355                            b: None,
356                        },
357                    },
358                )
359            "#]]
360            .assert_debug_eq(&foo);
361        }
362
363        #[test]
364        fn test_multiple_flatten() {
365            #[derive(Deserialize, Debug)]
366            struct Foo {
367                /// struct will "consume" the used fields!
368                #[serde(flatten)]
369                flatten_struct: Inner1,
370
371                /// map will keep the unknown fields!
372                #[serde(flatten)]
373                flatten_map1: BTreeMap<String, String>,
374
375                #[serde(flatten)]
376                flatten_map2: BTreeMap<String, String>,
377
378                #[serde(flatten)]
379                flatten_struct2: Inner2,
380            }
381
382            #[derive(Deserialize, Debug)]
383            #[serde(deny_unknown_fields)]
384            struct Inner1 {
385                a: Option<String>,
386                b: Option<String>,
387            }
388            #[derive(Deserialize, Debug)]
389            struct Inner11 {
390                c: Option<String>,
391            }
392            #[derive(Deserialize, Debug)]
393            #[serde(deny_unknown_fields)]
394            struct Inner2 {
395                c: Option<String>,
396            }
397
398            let json = r#"{
399                "a": "b", "c":"d"
400            }"#;
401            let foo2: Result<Foo, _> = serde_json::from_str(json);
402
403            // When there are multiple flatten, all of them will be used.
404            // Also, with outer `flatten``, the inner `deny_unknown_fields` is ignored.
405            expect![[r#"
406            Ok(
407                Foo {
408                    flatten_struct: Inner1 {
409                        a: Some(
410                            "b",
411                        ),
412                        b: None,
413                    },
414                    flatten_map1: {
415                        "c": "d",
416                    },
417                    flatten_map2: {
418                        "c": "d",
419                    },
420                    flatten_struct2: Inner2 {
421                        c: Some(
422                            "d",
423                        ),
424                    },
425                },
426            )
427        "#]]
428            .assert_debug_eq(&foo2);
429        }
430
431        #[test]
432        fn test_nested_flatten() {
433            #[derive(Deserialize, Debug)]
434            #[serde(deny_unknown_fields)]
435            struct Outer {
436                #[serde(flatten)]
437                inner: Inner,
438            }
439
440            #[derive(Deserialize, Debug)]
441            struct Inner {
442                a: Option<String>,
443                b: Option<String>,
444                #[serde(flatten)]
445                nested: InnerInner,
446            }
447
448            #[derive(Deserialize, Debug)]
449            struct InnerInner {
450                c: Option<String>,
451            }
452
453            let json = r#"{ "a": "b", "unknown":"1" }"#;
454
455            let foo: Result<Outer, _> = serde_json::from_str(json);
456
457            // This is very unfortunate...
458            expect_test::expect![[r#"
459            Err(
460                Error("unknown field `a`", line: 1, column: 27),
461            )
462        "#]]
463            .assert_debug_eq(&foo);
464
465            // Actually, the nested `flatten` will makes the struct behave like a map.
466            // Let's remove `deny_unknown_fields` and see
467            #[derive(Deserialize, Debug)]
468            struct Outer2 {
469                #[serde(flatten)]
470                inner: Inner,
471                /// We can see the fields of `inner` are not consumed.
472                #[serde(flatten)]
473                map: BTreeMap<String, String>,
474            }
475            let foo2: Result<Outer2, _> = serde_json::from_str(json);
476            expect_test::expect![[r#"
477                Ok(
478                    Outer2 {
479                        inner: Inner {
480                            a: Some(
481                                "b",
482                            ),
483                            b: None,
484                            nested: InnerInner {
485                                c: None,
486                            },
487                        },
488                        map: {
489                            "a": "b",
490                            "unknown": "1",
491                        },
492                    },
493                )
494            "#]]
495            .assert_debug_eq(&foo2);
496        }
497
498        #[test]
499        fn test_flatten_option() {
500            #[derive(Deserialize, Debug)]
501            struct Foo {
502                /// flatten option struct can still consume the field
503                #[serde(flatten)]
504                flatten_struct: Option<Inner1>,
505
506                /// flatten option map is always `Some`
507                #[serde(flatten)]
508                flatten_map1: Option<BTreeMap<String, String>>,
509
510                /// flatten option struct is `None` if the required field is absent
511                #[serde(flatten)]
512                flatten_struct2: Option<Inner2>,
513
514                /// flatten option struct is `Some` if the required field is present and optional field is absent.
515                /// Note: if all fields are optional, the struct is always `Some`
516                #[serde(flatten)]
517                flatten_struct3: Option<Inner3>,
518            }
519
520            #[derive(Deserialize, Debug)]
521            struct Inner1 {
522                a: Option<String>,
523                b: Option<String>,
524            }
525            #[derive(Deserialize, Debug)]
526            struct Inner11 {
527                c: Option<String>,
528            }
529
530            #[derive(Deserialize, Debug)]
531            struct Inner2 {
532                c: Option<String>,
533                d: String,
534            }
535
536            #[derive(Deserialize, Debug)]
537            struct Inner3 {
538                e: Option<String>,
539                f: String,
540            }
541
542            let json = r#"{
543        "a": "b", "c": "d", "f": "g"
544     }"#;
545            let foo: Result<Foo, _> = serde_json::from_str(json);
546            expect![[r#"
547                Ok(
548                    Foo {
549                        flatten_struct: Some(
550                            Inner1 {
551                                a: Some(
552                                    "b",
553                                ),
554                                b: None,
555                            },
556                        ),
557                        flatten_map1: Some(
558                            {
559                                "c": "d",
560                                "f": "g",
561                            },
562                        ),
563                        flatten_struct2: None,
564                        flatten_struct3: Some(
565                            Inner3 {
566                                e: None,
567                                f: "g",
568                            },
569                        ),
570                    },
571                )
572            "#]]
573            .assert_debug_eq(&foo);
574        }
575    }
576}