1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![feature(map_try_insert)]
37#![register_tool(rw)]
38#![recursion_limit = "256"]
39#![feature(min_specialization)]
40
41use std::time::Duration;
42
43use duration_str::parse_std;
44use serde::de;
45
46pub mod aws_utils;
47
48#[rustfmt::skip]
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
72where
73 D: de::Deserializer<'de>,
74{
75 let s: String = de::Deserialize::deserialize(deserializer)?;
76 s.parse().map_err(|_| {
77 de::Error::invalid_value(
78 de::Unexpected::Str(&s),
79 &"integer greater than or equal to 0",
80 )
81 })
82}
83
84pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
85 deserializer: D,
86) -> std::result::Result<Option<Vec<String>>, D::Error>
87where
88 D: de::Deserializer<'de>,
89{
90 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
91 if let Some(s) = s {
92 let s = s.to_ascii_lowercase();
93 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
94 Ok(Some(s))
95 } else {
96 Ok(None)
97 }
98}
99
100pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
101 deserializer: D,
102) -> std::result::Result<Option<Vec<u64>>, D::Error>
103where
104 D: de::Deserializer<'de>,
105{
106 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
107 if let Some(s) = s {
108 let numbers = s
109 .split(',')
110 .map(|s| s.trim().parse())
111 .collect::<Result<Vec<u64>, _>>()
112 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
113 Ok(Some(numbers?))
114 } else {
115 Ok(None)
116 }
117}
118
119pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
120where
121 D: de::Deserializer<'de>,
122{
123 let s: String = de::Deserialize::deserialize(deserializer)?;
124 let s = s.to_ascii_lowercase();
125 match s.as_str() {
126 "true" => Ok(true),
127 "false" => Ok(false),
128 _ => Err(de::Error::invalid_value(
129 de::Unexpected::Str(&s),
130 &"true or false",
131 )),
132 }
133}
134
135pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
136 deserializer: D,
137) -> std::result::Result<Option<bool>, D::Error>
138where
139 D: de::Deserializer<'de>,
140{
141 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
142 if let Some(s) = s {
143 let s = s.to_ascii_lowercase();
144 match s.as_str() {
145 "true" => Ok(Some(true)),
146 "false" => Ok(Some(false)),
147 _ => Err(de::Error::invalid_value(
148 de::Unexpected::Str(&s),
149 &"true or false",
150 )),
151 }
152 } else {
153 Ok(None)
154 }
155}
156
157pub(crate) fn deserialize_duration_from_string<'de, D>(
158 deserializer: D,
159) -> Result<Duration, D::Error>
160where
161 D: de::Deserializer<'de>,
162{
163 let s: String = de::Deserialize::deserialize(deserializer)?;
164 parse_std(&s).map_err(|_| de::Error::invalid_value(
165 de::Unexpected::Str(&s),
166 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
167 ))
168}
169
170#[cfg(test)]
171mod tests {
172 use expect_test::expect_file;
173
174 use crate::with_options_test::{
175 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_sink,
176 generate_with_options_yaml_source,
177 };
178
179 #[test]
183 fn test_with_options_yaml_up_to_date() {
184 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
185
186 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
187 }
188
189 #[test]
191 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
192 expect_file!("../src/allow_alter_on_fly_fields.rs")
193 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
194 }
195
196 mod serde {
198 #![expect(dead_code)]
199
200 use std::collections::BTreeMap;
201
202 use expect_test::expect;
203 use serde::Deserialize;
204
205 #[test]
213 fn test_outer_deny() {
214 #[derive(Deserialize, Debug)]
215 #[serde(deny_unknown_fields)]
216 struct FlattenMap {
217 #[serde(flatten)]
218 flatten: BTreeMap<String, String>,
219 }
220 #[derive(Deserialize, Debug)]
221 #[serde(deny_unknown_fields)]
222 struct FlattenStruct {
223 #[serde(flatten)]
224 flatten_struct: Inner,
225 }
226
227 #[derive(Deserialize, Debug)]
228 #[serde(deny_unknown_fields)]
229 struct FlattenBoth {
230 #[serde(flatten)]
231 flatten: BTreeMap<String, String>,
232 #[serde(flatten)]
233 flatten_struct: Inner,
234 }
235
236 #[derive(Deserialize, Debug)]
237 struct Inner {
238 a: Option<String>,
239 b: Option<String>,
240 }
241
242 let json = r#"{
243 "a": "b"
244 }"#;
245 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
246 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
247 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
248
249 expect![[r#"
251 Err(
252 Error("unknown field `a`", line: 3, column: 13),
253 )
254 "#]]
255 .assert_debug_eq(&foo);
256
257 expect![[r#"
259 Ok(
260 FlattenStruct {
261 flatten_struct: Inner {
262 a: Some(
263 "b",
264 ),
265 b: None,
266 },
267 },
268 )
269 "#]]
270 .assert_debug_eq(&foo1);
271 let foo11: Result<FlattenStruct, _> =
273 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
274 expect_test::expect![[r#"
275 Err(
276 Error("unknown field `unknown`", line: 1, column: 25),
277 )
278 "#]]
279 .assert_debug_eq(&foo11);
280
281 expect![[r#"
283 Ok(
284 FlattenBoth {
285 flatten: {
286 "a": "b",
287 },
288 flatten_struct: Inner {
289 a: Some(
290 "b",
291 ),
292 b: None,
293 },
294 },
295 )
296 "#]]
297 .assert_debug_eq(&foo2);
298
299 let foo21: Result<FlattenBoth, _> =
300 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
301 expect_test::expect![[r#"
302 Err(
303 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
304 )
305 "#]]
306 .assert_debug_eq(&foo21);
307 let foo22: Result<FlattenBoth, _> =
309 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
310 expect_test::expect![[r#"
311 Err(
312 Error("unknown field `unknown`", line: 1, column: 27),
313 )
314 "#]]
315 .assert_debug_eq(&foo22);
316 }
317
318 #[test]
319 fn test_inner_deny() {
320 #[derive(Deserialize, Debug)]
322 struct FlattenStruct {
323 #[serde(flatten)]
324 flatten_struct: Inner,
325 }
326 #[derive(Deserialize, Debug)]
327 #[serde(deny_unknown_fields)]
328 struct Inner {
329 a: Option<String>,
330 b: Option<String>,
331 }
332
333 let json = r#"{
334 "a": "b", "unknown":1
335 }"#;
336 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
337 expect_test::expect![[r#"
341 Ok(
342 FlattenStruct {
343 flatten_struct: Inner {
344 a: Some(
345 "b",
346 ),
347 b: None,
348 },
349 },
350 )
351 "#]]
352 .assert_debug_eq(&foo);
353 }
354
355 #[test]
356 fn test_multiple_flatten() {
357 #[derive(Deserialize, Debug)]
358 struct Foo {
359 #[serde(flatten)]
361 flatten_struct: Inner1,
362
363 #[serde(flatten)]
365 flatten_map1: BTreeMap<String, String>,
366
367 #[serde(flatten)]
368 flatten_map2: BTreeMap<String, String>,
369
370 #[serde(flatten)]
371 flatten_struct2: Inner2,
372 }
373
374 #[derive(Deserialize, Debug)]
375 #[serde(deny_unknown_fields)]
376 struct Inner1 {
377 a: Option<String>,
378 b: Option<String>,
379 }
380 #[derive(Deserialize, Debug)]
381 struct Inner11 {
382 c: Option<String>,
383 }
384 #[derive(Deserialize, Debug)]
385 #[serde(deny_unknown_fields)]
386 struct Inner2 {
387 c: Option<String>,
388 }
389
390 let json = r#"{
391 "a": "b", "c":"d"
392 }"#;
393 let foo2: Result<Foo, _> = serde_json::from_str(json);
394
395 expect![[r#"
398 Ok(
399 Foo {
400 flatten_struct: Inner1 {
401 a: Some(
402 "b",
403 ),
404 b: None,
405 },
406 flatten_map1: {
407 "c": "d",
408 },
409 flatten_map2: {
410 "c": "d",
411 },
412 flatten_struct2: Inner2 {
413 c: Some(
414 "d",
415 ),
416 },
417 },
418 )
419 "#]]
420 .assert_debug_eq(&foo2);
421 }
422
423 #[test]
424 fn test_nested_flatten() {
425 #[derive(Deserialize, Debug)]
426 #[serde(deny_unknown_fields)]
427 struct Outer {
428 #[serde(flatten)]
429 inner: Inner,
430 }
431
432 #[derive(Deserialize, Debug)]
433 struct Inner {
434 a: Option<String>,
435 b: Option<String>,
436 #[serde(flatten)]
437 nested: InnerInner,
438 }
439
440 #[derive(Deserialize, Debug)]
441 struct InnerInner {
442 c: Option<String>,
443 }
444
445 let json = r#"{ "a": "b", "unknown":"1" }"#;
446
447 let foo: Result<Outer, _> = serde_json::from_str(json);
448
449 expect_test::expect![[r#"
451 Err(
452 Error("unknown field `a`", line: 1, column: 27),
453 )
454 "#]]
455 .assert_debug_eq(&foo);
456
457 #[derive(Deserialize, Debug)]
460 struct Outer2 {
461 #[serde(flatten)]
462 inner: Inner,
463 #[serde(flatten)]
465 map: BTreeMap<String, String>,
466 }
467 let foo2: Result<Outer2, _> = serde_json::from_str(json);
468 expect_test::expect![[r#"
469 Ok(
470 Outer2 {
471 inner: Inner {
472 a: Some(
473 "b",
474 ),
475 b: None,
476 nested: InnerInner {
477 c: None,
478 },
479 },
480 map: {
481 "a": "b",
482 "unknown": "1",
483 },
484 },
485 )
486 "#]]
487 .assert_debug_eq(&foo2);
488 }
489
490 #[test]
491 fn test_flatten_option() {
492 #[derive(Deserialize, Debug)]
493 struct Foo {
494 #[serde(flatten)]
496 flatten_struct: Option<Inner1>,
497
498 #[serde(flatten)]
500 flatten_map1: Option<BTreeMap<String, String>>,
501
502 #[serde(flatten)]
504 flatten_struct2: Option<Inner2>,
505
506 #[serde(flatten)]
509 flatten_struct3: Option<Inner3>,
510 }
511
512 #[derive(Deserialize, Debug)]
513 struct Inner1 {
514 a: Option<String>,
515 b: Option<String>,
516 }
517 #[derive(Deserialize, Debug)]
518 struct Inner11 {
519 c: Option<String>,
520 }
521
522 #[derive(Deserialize, Debug)]
523 struct Inner2 {
524 c: Option<String>,
525 d: String,
526 }
527
528 #[derive(Deserialize, Debug)]
529 struct Inner3 {
530 e: Option<String>,
531 f: String,
532 }
533
534 let json = r#"{
535 "a": "b", "c": "d", "f": "g"
536 }"#;
537 let foo: Result<Foo, _> = serde_json::from_str(json);
538 expect![[r#"
539 Ok(
540 Foo {
541 flatten_struct: Some(
542 Inner1 {
543 a: Some(
544 "b",
545 ),
546 b: None,
547 },
548 ),
549 flatten_map1: Some(
550 {
551 "c": "d",
552 "f": "g",
553 },
554 ),
555 flatten_struct2: None,
556 flatten_struct3: Some(
557 Inner3 {
558 e: None,
559 f: "g",
560 },
561 ),
562 },
563 )
564 "#]]
565 .assert_debug_eq(&foo);
566 }
567 }
568}