1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39
40use std::time::Duration;
41
42use duration_str::parse_std;
43use serde::de;
44
45pub mod aws_utils;
46mod enforce_secret;
47pub mod error;
48mod macros;
49
50pub mod parser;
51pub mod schema;
52pub mod sink;
53pub mod source;
54
55pub mod connector_common;
56
57pub use paste::paste;
58pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
59
60mod with_options;
61pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
62
63#[cfg(test)]
64mod with_options_test;
65
66pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
67where
68 D: de::Deserializer<'de>,
69{
70 let s: String = de::Deserialize::deserialize(deserializer)?;
71 s.parse().map_err(|_| {
72 de::Error::invalid_value(
73 de::Unexpected::Str(&s),
74 &"integer greater than or equal to 0",
75 )
76 })
77}
78
79pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
80 deserializer: D,
81) -> std::result::Result<Option<Vec<String>>, D::Error>
82where
83 D: de::Deserializer<'de>,
84{
85 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
86 if let Some(s) = s {
87 let s = s.to_ascii_lowercase();
88 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
89 Ok(Some(s))
90 } else {
91 Ok(None)
92 }
93}
94
95pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
96 deserializer: D,
97) -> std::result::Result<Option<Vec<u64>>, D::Error>
98where
99 D: de::Deserializer<'de>,
100{
101 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
102 if let Some(s) = s {
103 let numbers = s
104 .split(',')
105 .map(|s| s.trim().parse())
106 .collect::<Result<Vec<u64>, _>>()
107 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
108 Ok(Some(numbers?))
109 } else {
110 Ok(None)
111 }
112}
113
114pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
115where
116 D: de::Deserializer<'de>,
117{
118 let s: String = de::Deserialize::deserialize(deserializer)?;
119 let s = s.to_ascii_lowercase();
120 match s.as_str() {
121 "true" => Ok(true),
122 "false" => Ok(false),
123 _ => Err(de::Error::invalid_value(
124 de::Unexpected::Str(&s),
125 &"true or false",
126 )),
127 }
128}
129
130pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
131 deserializer: D,
132) -> std::result::Result<Option<bool>, D::Error>
133where
134 D: de::Deserializer<'de>,
135{
136 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
137 if let Some(s) = s {
138 let s = s.to_ascii_lowercase();
139 match s.as_str() {
140 "true" => Ok(Some(true)),
141 "false" => Ok(Some(false)),
142 _ => Err(de::Error::invalid_value(
143 de::Unexpected::Str(&s),
144 &"true or false",
145 )),
146 }
147 } else {
148 Ok(None)
149 }
150}
151
152pub(crate) fn deserialize_duration_from_string<'de, D>(
153 deserializer: D,
154) -> Result<Duration, D::Error>
155where
156 D: de::Deserializer<'de>,
157{
158 let s: String = de::Deserialize::deserialize(deserializer)?;
159 parse_std(&s).map_err(|_| de::Error::invalid_value(
160 de::Unexpected::Str(&s),
161 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
162 ))
163}
164
165#[cfg(test)]
166mod tests {
167 use expect_test::expect_file;
168
169 use crate::with_options_test::{
170 generate_with_options_yaml_sink, generate_with_options_yaml_source,
171 };
172
173 #[test]
177 fn test_with_options_yaml_up_to_date() {
178 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
179
180 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
181 }
182
183 mod serde {
185 #![expect(dead_code)]
186
187 use std::collections::BTreeMap;
188
189 use expect_test::expect;
190 use serde::Deserialize;
191
192 #[test]
200 fn test_outer_deny() {
201 #[derive(Deserialize, Debug)]
202 #[serde(deny_unknown_fields)]
203 struct FlattenMap {
204 #[serde(flatten)]
205 flatten: BTreeMap<String, String>,
206 }
207 #[derive(Deserialize, Debug)]
208 #[serde(deny_unknown_fields)]
209 struct FlattenStruct {
210 #[serde(flatten)]
211 flatten_struct: Inner,
212 }
213
214 #[derive(Deserialize, Debug)]
215 #[serde(deny_unknown_fields)]
216 struct FlattenBoth {
217 #[serde(flatten)]
218 flatten: BTreeMap<String, String>,
219 #[serde(flatten)]
220 flatten_struct: Inner,
221 }
222
223 #[derive(Deserialize, Debug)]
224 struct Inner {
225 a: Option<String>,
226 b: Option<String>,
227 }
228
229 let json = r#"{
230 "a": "b"
231 }"#;
232 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
233 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
234 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
235
236 expect![[r#"
238 Err(
239 Error("unknown field `a`", line: 3, column: 13),
240 )
241 "#]]
242 .assert_debug_eq(&foo);
243
244 expect![[r#"
246 Ok(
247 FlattenStruct {
248 flatten_struct: Inner {
249 a: Some(
250 "b",
251 ),
252 b: None,
253 },
254 },
255 )
256 "#]]
257 .assert_debug_eq(&foo1);
258 let foo11: Result<FlattenStruct, _> =
260 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
261 expect_test::expect![[r#"
262 Err(
263 Error("unknown field `unknown`", line: 1, column: 25),
264 )
265 "#]]
266 .assert_debug_eq(&foo11);
267
268 expect![[r#"
270 Ok(
271 FlattenBoth {
272 flatten: {
273 "a": "b",
274 },
275 flatten_struct: Inner {
276 a: Some(
277 "b",
278 ),
279 b: None,
280 },
281 },
282 )
283 "#]]
284 .assert_debug_eq(&foo2);
285
286 let foo21: Result<FlattenBoth, _> =
287 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
288 expect_test::expect![[r#"
289 Err(
290 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
291 )
292 "#]]
293 .assert_debug_eq(&foo21);
294 let foo22: Result<FlattenBoth, _> =
296 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
297 expect_test::expect![[r#"
298 Err(
299 Error("unknown field `unknown`", line: 1, column: 27),
300 )
301 "#]]
302 .assert_debug_eq(&foo22);
303 }
304
305 #[test]
306 fn test_inner_deny() {
307 #[derive(Deserialize, Debug)]
309 struct FlattenStruct {
310 #[serde(flatten)]
311 flatten_struct: Inner,
312 }
313 #[derive(Deserialize, Debug)]
314 #[serde(deny_unknown_fields)]
315 struct Inner {
316 a: Option<String>,
317 b: Option<String>,
318 }
319
320 let json = r#"{
321 "a": "b", "unknown":1
322 }"#;
323 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
324 expect_test::expect![[r#"
328 Ok(
329 FlattenStruct {
330 flatten_struct: Inner {
331 a: Some(
332 "b",
333 ),
334 b: None,
335 },
336 },
337 )
338 "#]]
339 .assert_debug_eq(&foo);
340 }
341
342 #[test]
343 fn test_multiple_flatten() {
344 #[derive(Deserialize, Debug)]
345 struct Foo {
346 #[serde(flatten)]
348 flatten_struct: Inner1,
349
350 #[serde(flatten)]
352 flatten_map1: BTreeMap<String, String>,
353
354 #[serde(flatten)]
355 flatten_map2: BTreeMap<String, String>,
356
357 #[serde(flatten)]
358 flatten_struct2: Inner2,
359 }
360
361 #[derive(Deserialize, Debug)]
362 #[serde(deny_unknown_fields)]
363 struct Inner1 {
364 a: Option<String>,
365 b: Option<String>,
366 }
367 #[derive(Deserialize, Debug)]
368 struct Inner11 {
369 c: Option<String>,
370 }
371 #[derive(Deserialize, Debug)]
372 #[serde(deny_unknown_fields)]
373 struct Inner2 {
374 c: Option<String>,
375 }
376
377 let json = r#"{
378 "a": "b", "c":"d"
379 }"#;
380 let foo2: Result<Foo, _> = serde_json::from_str(json);
381
382 expect![[r#"
385 Ok(
386 Foo {
387 flatten_struct: Inner1 {
388 a: Some(
389 "b",
390 ),
391 b: None,
392 },
393 flatten_map1: {
394 "c": "d",
395 },
396 flatten_map2: {
397 "c": "d",
398 },
399 flatten_struct2: Inner2 {
400 c: Some(
401 "d",
402 ),
403 },
404 },
405 )
406 "#]]
407 .assert_debug_eq(&foo2);
408 }
409
410 #[test]
411 fn test_nested_flatten() {
412 #[derive(Deserialize, Debug)]
413 #[serde(deny_unknown_fields)]
414 struct Outer {
415 #[serde(flatten)]
416 inner: Inner,
417 }
418
419 #[derive(Deserialize, Debug)]
420 struct Inner {
421 a: Option<String>,
422 b: Option<String>,
423 #[serde(flatten)]
424 nested: InnerInner,
425 }
426
427 #[derive(Deserialize, Debug)]
428 struct InnerInner {
429 c: Option<String>,
430 }
431
432 let json = r#"{ "a": "b", "unknown":"1" }"#;
433
434 let foo: Result<Outer, _> = serde_json::from_str(json);
435
436 expect_test::expect![[r#"
438 Err(
439 Error("unknown field `a`", line: 1, column: 27),
440 )
441 "#]]
442 .assert_debug_eq(&foo);
443
444 #[derive(Deserialize, Debug)]
447 struct Outer2 {
448 #[serde(flatten)]
449 inner: Inner,
450 #[serde(flatten)]
452 map: BTreeMap<String, String>,
453 }
454 let foo2: Result<Outer2, _> = serde_json::from_str(json);
455 expect_test::expect![[r#"
456 Ok(
457 Outer2 {
458 inner: Inner {
459 a: Some(
460 "b",
461 ),
462 b: None,
463 nested: InnerInner {
464 c: None,
465 },
466 },
467 map: {
468 "a": "b",
469 "unknown": "1",
470 },
471 },
472 )
473 "#]]
474 .assert_debug_eq(&foo2);
475 }
476
477 #[test]
478 fn test_flatten_option() {
479 #[derive(Deserialize, Debug)]
480 struct Foo {
481 #[serde(flatten)]
483 flatten_struct: Option<Inner1>,
484
485 #[serde(flatten)]
487 flatten_map1: Option<BTreeMap<String, String>>,
488
489 #[serde(flatten)]
491 flatten_struct2: Option<Inner2>,
492
493 #[serde(flatten)]
496 flatten_struct3: Option<Inner3>,
497 }
498
499 #[derive(Deserialize, Debug)]
500 struct Inner1 {
501 a: Option<String>,
502 b: Option<String>,
503 }
504 #[derive(Deserialize, Debug)]
505 struct Inner11 {
506 c: Option<String>,
507 }
508
509 #[derive(Deserialize, Debug)]
510 struct Inner2 {
511 c: Option<String>,
512 d: String,
513 }
514
515 #[derive(Deserialize, Debug)]
516 struct Inner3 {
517 e: Option<String>,
518 f: String,
519 }
520
521 let json = r#"{
522 "a": "b", "c": "d", "f": "g"
523 }"#;
524 let foo: Result<Foo, _> = serde_json::from_str(json);
525 expect![[r#"
526 Ok(
527 Foo {
528 flatten_struct: Some(
529 Inner1 {
530 a: Some(
531 "b",
532 ),
533 b: None,
534 },
535 ),
536 flatten_map1: Some(
537 {
538 "c": "d",
539 "f": "g",
540 },
541 ),
542 flatten_struct2: None,
543 flatten_struct3: Some(
544 Inner3 {
545 e: None,
546 f: "g",
547 },
548 ),
549 },
550 )
551 "#]]
552 .assert_debug_eq(&foo);
553 }
554 }
555}