1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39
40use std::time::Duration;
41
42use duration_str::parse_std;
43use serde::de;
44
45pub mod aws_utils;
46pub mod error;
47mod macros;
48
49pub mod parser;
50pub mod schema;
51pub mod sink;
52pub mod source;
53
54pub mod connector_common;
55
56pub use paste::paste;
57pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
58
59mod with_options;
60pub use with_options::{WithOptionsSecResolved, WithPropertiesExt};
61
62#[cfg(test)]
63mod with_options_test;
64
65pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
66where
67 D: de::Deserializer<'de>,
68{
69 let s: String = de::Deserialize::deserialize(deserializer)?;
70 s.parse().map_err(|_| {
71 de::Error::invalid_value(
72 de::Unexpected::Str(&s),
73 &"integer greater than or equal to 0",
74 )
75 })
76}
77
78pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
79 deserializer: D,
80) -> std::result::Result<Option<Vec<String>>, D::Error>
81where
82 D: de::Deserializer<'de>,
83{
84 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
85 if let Some(s) = s {
86 let s = s.to_ascii_lowercase();
87 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
88 Ok(Some(s))
89 } else {
90 Ok(None)
91 }
92}
93
94pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
95 deserializer: D,
96) -> std::result::Result<Option<Vec<u64>>, D::Error>
97where
98 D: de::Deserializer<'de>,
99{
100 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
101 if let Some(s) = s {
102 let numbers = s
103 .split(',')
104 .map(|s| s.trim().parse())
105 .collect::<Result<Vec<u64>, _>>()
106 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
107 Ok(Some(numbers?))
108 } else {
109 Ok(None)
110 }
111}
112
113pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
114where
115 D: de::Deserializer<'de>,
116{
117 let s: String = de::Deserialize::deserialize(deserializer)?;
118 let s = s.to_ascii_lowercase();
119 match s.as_str() {
120 "true" => Ok(true),
121 "false" => Ok(false),
122 _ => Err(de::Error::invalid_value(
123 de::Unexpected::Str(&s),
124 &"true or false",
125 )),
126 }
127}
128
129pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
130 deserializer: D,
131) -> std::result::Result<Option<bool>, D::Error>
132where
133 D: de::Deserializer<'de>,
134{
135 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
136 if let Some(s) = s {
137 let s = s.to_ascii_lowercase();
138 match s.as_str() {
139 "true" => Ok(Some(true)),
140 "false" => Ok(Some(false)),
141 _ => Err(de::Error::invalid_value(
142 de::Unexpected::Str(&s),
143 &"true or false",
144 )),
145 }
146 } else {
147 Ok(None)
148 }
149}
150
151pub(crate) fn deserialize_duration_from_string<'de, D>(
152 deserializer: D,
153) -> Result<Duration, D::Error>
154where
155 D: de::Deserializer<'de>,
156{
157 let s: String = de::Deserialize::deserialize(deserializer)?;
158 parse_std(&s).map_err(|_| de::Error::invalid_value(
159 de::Unexpected::Str(&s),
160 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
161 ))
162}
163
164#[cfg(test)]
165mod tests {
166 use expect_test::expect_file;
167
168 use crate::with_options_test::{
169 generate_with_options_yaml_sink, generate_with_options_yaml_source,
170 };
171
172 #[test]
176 fn test_with_options_yaml_up_to_date() {
177 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
178
179 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
180 }
181
182 mod serde {
184 #![expect(dead_code)]
185
186 use std::collections::BTreeMap;
187
188 use expect_test::expect;
189 use serde::Deserialize;
190
191 #[test]
199 fn test_outer_deny() {
200 #[derive(Deserialize, Debug)]
201 #[serde(deny_unknown_fields)]
202 struct FlattenMap {
203 #[serde(flatten)]
204 flatten: BTreeMap<String, String>,
205 }
206 #[derive(Deserialize, Debug)]
207 #[serde(deny_unknown_fields)]
208 struct FlattenStruct {
209 #[serde(flatten)]
210 flatten_struct: Inner,
211 }
212
213 #[derive(Deserialize, Debug)]
214 #[serde(deny_unknown_fields)]
215 struct FlattenBoth {
216 #[serde(flatten)]
217 flatten: BTreeMap<String, String>,
218 #[serde(flatten)]
219 flatten_struct: Inner,
220 }
221
222 #[derive(Deserialize, Debug)]
223 struct Inner {
224 a: Option<String>,
225 b: Option<String>,
226 }
227
228 let json = r#"{
229 "a": "b"
230 }"#;
231 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
232 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
233 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
234
235 expect![[r#"
237 Err(
238 Error("unknown field `a`", line: 3, column: 13),
239 )
240 "#]]
241 .assert_debug_eq(&foo);
242
243 expect![[r#"
245 Ok(
246 FlattenStruct {
247 flatten_struct: Inner {
248 a: Some(
249 "b",
250 ),
251 b: None,
252 },
253 },
254 )
255 "#]]
256 .assert_debug_eq(&foo1);
257 let foo11: Result<FlattenStruct, _> =
259 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
260 expect_test::expect![[r#"
261 Err(
262 Error("unknown field `unknown`", line: 1, column: 25),
263 )
264 "#]]
265 .assert_debug_eq(&foo11);
266
267 expect![[r#"
269 Ok(
270 FlattenBoth {
271 flatten: {
272 "a": "b",
273 },
274 flatten_struct: Inner {
275 a: Some(
276 "b",
277 ),
278 b: None,
279 },
280 },
281 )
282 "#]]
283 .assert_debug_eq(&foo2);
284
285 let foo21: Result<FlattenBoth, _> =
286 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
287 expect_test::expect![[r#"
288 Err(
289 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
290 )
291 "#]]
292 .assert_debug_eq(&foo21);
293 let foo22: Result<FlattenBoth, _> =
295 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
296 expect_test::expect![[r#"
297 Err(
298 Error("unknown field `unknown`", line: 1, column: 27),
299 )
300 "#]]
301 .assert_debug_eq(&foo22);
302 }
303
304 #[test]
305 fn test_inner_deny() {
306 #[derive(Deserialize, Debug)]
308 struct FlattenStruct {
309 #[serde(flatten)]
310 flatten_struct: Inner,
311 }
312 #[derive(Deserialize, Debug)]
313 #[serde(deny_unknown_fields)]
314 struct Inner {
315 a: Option<String>,
316 b: Option<String>,
317 }
318
319 let json = r#"{
320 "a": "b", "unknown":1
321 }"#;
322 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
323 expect_test::expect![[r#"
327 Ok(
328 FlattenStruct {
329 flatten_struct: Inner {
330 a: Some(
331 "b",
332 ),
333 b: None,
334 },
335 },
336 )
337 "#]]
338 .assert_debug_eq(&foo);
339 }
340
341 #[test]
342 fn test_multiple_flatten() {
343 #[derive(Deserialize, Debug)]
344 struct Foo {
345 #[serde(flatten)]
347 flatten_struct: Inner1,
348
349 #[serde(flatten)]
351 flatten_map1: BTreeMap<String, String>,
352
353 #[serde(flatten)]
354 flatten_map2: BTreeMap<String, String>,
355
356 #[serde(flatten)]
357 flatten_struct2: Inner2,
358 }
359
360 #[derive(Deserialize, Debug)]
361 #[serde(deny_unknown_fields)]
362 struct Inner1 {
363 a: Option<String>,
364 b: Option<String>,
365 }
366 #[derive(Deserialize, Debug)]
367 struct Inner11 {
368 c: Option<String>,
369 }
370 #[derive(Deserialize, Debug)]
371 #[serde(deny_unknown_fields)]
372 struct Inner2 {
373 c: Option<String>,
374 }
375
376 let json = r#"{
377 "a": "b", "c":"d"
378 }"#;
379 let foo2: Result<Foo, _> = serde_json::from_str(json);
380
381 expect![[r#"
384 Ok(
385 Foo {
386 flatten_struct: Inner1 {
387 a: Some(
388 "b",
389 ),
390 b: None,
391 },
392 flatten_map1: {
393 "c": "d",
394 },
395 flatten_map2: {
396 "c": "d",
397 },
398 flatten_struct2: Inner2 {
399 c: Some(
400 "d",
401 ),
402 },
403 },
404 )
405 "#]]
406 .assert_debug_eq(&foo2);
407 }
408
409 #[test]
410 fn test_nested_flatten() {
411 #[derive(Deserialize, Debug)]
412 #[serde(deny_unknown_fields)]
413 struct Outer {
414 #[serde(flatten)]
415 inner: Inner,
416 }
417
418 #[derive(Deserialize, Debug)]
419 struct Inner {
420 a: Option<String>,
421 b: Option<String>,
422 #[serde(flatten)]
423 nested: InnerInner,
424 }
425
426 #[derive(Deserialize, Debug)]
427 struct InnerInner {
428 c: Option<String>,
429 }
430
431 let json = r#"{ "a": "b", "unknown":"1" }"#;
432
433 let foo: Result<Outer, _> = serde_json::from_str(json);
434
435 expect_test::expect![[r#"
437 Err(
438 Error("unknown field `a`", line: 1, column: 27),
439 )
440 "#]]
441 .assert_debug_eq(&foo);
442
443 #[derive(Deserialize, Debug)]
446 struct Outer2 {
447 #[serde(flatten)]
448 inner: Inner,
449 #[serde(flatten)]
451 map: BTreeMap<String, String>,
452 }
453 let foo2: Result<Outer2, _> = serde_json::from_str(json);
454 expect_test::expect![[r#"
455 Ok(
456 Outer2 {
457 inner: Inner {
458 a: Some(
459 "b",
460 ),
461 b: None,
462 nested: InnerInner {
463 c: None,
464 },
465 },
466 map: {
467 "a": "b",
468 "unknown": "1",
469 },
470 },
471 )
472 "#]]
473 .assert_debug_eq(&foo2);
474 }
475
476 #[test]
477 fn test_flatten_option() {
478 #[derive(Deserialize, Debug)]
479 struct Foo {
480 #[serde(flatten)]
482 flatten_struct: Option<Inner1>,
483
484 #[serde(flatten)]
486 flatten_map1: Option<BTreeMap<String, String>>,
487
488 #[serde(flatten)]
490 flatten_struct2: Option<Inner2>,
491
492 #[serde(flatten)]
495 flatten_struct3: Option<Inner3>,
496 }
497
498 #[derive(Deserialize, Debug)]
499 struct Inner1 {
500 a: Option<String>,
501 b: Option<String>,
502 }
503 #[derive(Deserialize, Debug)]
504 struct Inner11 {
505 c: Option<String>,
506 }
507
508 #[derive(Deserialize, Debug)]
509 struct Inner2 {
510 c: Option<String>,
511 d: String,
512 }
513
514 #[derive(Deserialize, Debug)]
515 struct Inner3 {
516 e: Option<String>,
517 f: String,
518 }
519
520 let json = r#"{
521 "a": "b", "c": "d", "f": "g"
522 }"#;
523 let foo: Result<Foo, _> = serde_json::from_str(json);
524 expect![[r#"
525 Ok(
526 Foo {
527 flatten_struct: Some(
528 Inner1 {
529 a: Some(
530 "b",
531 ),
532 b: None,
533 },
534 ),
535 flatten_map1: Some(
536 {
537 "c": "d",
538 "f": "g",
539 },
540 ),
541 flatten_struct2: None,
542 flatten_struct3: Some(
543 Inner3 {
544 e: None,
545 f: "g",
546 },
547 ),
548 },
549 )
550 "#]]
551 .assert_debug_eq(&foo);
552 }
553 }
554}