1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(array_chunks)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(let_chains)]
23#![feature(box_into_inner)]
24#![feature(type_alias_impl_trait)]
25#![feature(associated_type_defaults)]
26#![feature(impl_trait_in_assoc_type)]
27#![feature(iter_from_coroutine)]
28#![feature(if_let_guard)]
29#![feature(iterator_try_collect)]
30#![feature(try_blocks)]
31#![feature(error_generic_member_access)]
32#![feature(negative_impls)]
33#![feature(register_tool)]
34#![feature(assert_matches)]
35#![feature(never_type)]
36#![feature(map_try_insert)]
37#![register_tool(rw)]
38#![recursion_limit = "256"]
39#![feature(min_specialization)]
40
41use std::time::Duration;
42
43use duration_str::parse_std;
44use serde::de;
45
46pub mod aws_utils;
47
48#[rustfmt::skip]
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72
73pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
74where
75 D: de::Deserializer<'de>,
76{
77 let s: String = de::Deserialize::deserialize(deserializer)?;
78 s.parse().map_err(|_| {
79 de::Error::invalid_value(
80 de::Unexpected::Str(&s),
81 &"integer greater than or equal to 0",
82 )
83 })
84}
85
86pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
87 deserializer: D,
88) -> std::result::Result<Option<Vec<String>>, D::Error>
89where
90 D: de::Deserializer<'de>,
91{
92 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
93 if let Some(s) = s {
94 let s = s.to_ascii_lowercase();
95 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
96 Ok(Some(s))
97 } else {
98 Ok(None)
99 }
100}
101
102pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
103 deserializer: D,
104) -> std::result::Result<Option<Vec<u64>>, D::Error>
105where
106 D: de::Deserializer<'de>,
107{
108 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
109 if let Some(s) = s {
110 let numbers = s
111 .split(',')
112 .map(|s| s.trim().parse())
113 .collect::<Result<Vec<u64>, _>>()
114 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
115 Ok(Some(numbers?))
116 } else {
117 Ok(None)
118 }
119}
120
121pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
122where
123 D: de::Deserializer<'de>,
124{
125 let s: String = de::Deserialize::deserialize(deserializer)?;
126 let s = s.to_ascii_lowercase();
127 match s.as_str() {
128 "true" => Ok(true),
129 "false" => Ok(false),
130 _ => Err(de::Error::invalid_value(
131 de::Unexpected::Str(&s),
132 &"true or false",
133 )),
134 }
135}
136
137pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
138 deserializer: D,
139) -> std::result::Result<Option<bool>, D::Error>
140where
141 D: de::Deserializer<'de>,
142{
143 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
144 if let Some(s) = s {
145 let s = s.to_ascii_lowercase();
146 match s.as_str() {
147 "true" => Ok(Some(true)),
148 "false" => Ok(Some(false)),
149 _ => Err(de::Error::invalid_value(
150 de::Unexpected::Str(&s),
151 &"true or false",
152 )),
153 }
154 } else {
155 Ok(None)
156 }
157}
158
159pub(crate) fn deserialize_duration_from_string<'de, D>(
160 deserializer: D,
161) -> Result<Duration, D::Error>
162where
163 D: de::Deserializer<'de>,
164{
165 let s: String = de::Deserialize::deserialize(deserializer)?;
166 parse_std(&s).map_err(|_| de::Error::invalid_value(
167 de::Unexpected::Str(&s),
168 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
169 ))
170}
171
172#[cfg(test)]
173mod tests {
174 use expect_test::expect_file;
175
176 use crate::with_options_test::{
177 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
178 generate_with_options_yaml_sink, generate_with_options_yaml_source,
179 };
180
181 #[test]
185 fn test_with_options_yaml_up_to_date() {
186 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
187
188 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
189
190 expect_file!("../with_options_connection.yaml")
191 .assert_eq(&generate_with_options_yaml_connection());
192 }
193
194 #[test]
196 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
197 expect_file!("../src/allow_alter_on_fly_fields.rs")
198 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
199 }
200
201 mod serde {
203 #![expect(dead_code)]
204
205 use std::collections::BTreeMap;
206
207 use expect_test::expect;
208 use serde::Deserialize;
209
210 #[test]
218 fn test_outer_deny() {
219 #[derive(Deserialize, Debug)]
220 #[serde(deny_unknown_fields)]
221 struct FlattenMap {
222 #[serde(flatten)]
223 flatten: BTreeMap<String, String>,
224 }
225 #[derive(Deserialize, Debug)]
226 #[serde(deny_unknown_fields)]
227 struct FlattenStruct {
228 #[serde(flatten)]
229 flatten_struct: Inner,
230 }
231
232 #[derive(Deserialize, Debug)]
233 #[serde(deny_unknown_fields)]
234 struct FlattenBoth {
235 #[serde(flatten)]
236 flatten: BTreeMap<String, String>,
237 #[serde(flatten)]
238 flatten_struct: Inner,
239 }
240
241 #[derive(Deserialize, Debug)]
242 struct Inner {
243 a: Option<String>,
244 b: Option<String>,
245 }
246
247 let json = r#"{
248 "a": "b"
249 }"#;
250 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
251 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
252 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
253
254 expect![[r#"
256 Err(
257 Error("unknown field `a`", line: 3, column: 13),
258 )
259 "#]]
260 .assert_debug_eq(&foo);
261
262 expect![[r#"
264 Ok(
265 FlattenStruct {
266 flatten_struct: Inner {
267 a: Some(
268 "b",
269 ),
270 b: None,
271 },
272 },
273 )
274 "#]]
275 .assert_debug_eq(&foo1);
276 let foo11: Result<FlattenStruct, _> =
278 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
279 expect_test::expect![[r#"
280 Err(
281 Error("unknown field `unknown`", line: 1, column: 25),
282 )
283 "#]]
284 .assert_debug_eq(&foo11);
285
286 expect![[r#"
288 Ok(
289 FlattenBoth {
290 flatten: {
291 "a": "b",
292 },
293 flatten_struct: Inner {
294 a: Some(
295 "b",
296 ),
297 b: None,
298 },
299 },
300 )
301 "#]]
302 .assert_debug_eq(&foo2);
303
304 let foo21: Result<FlattenBoth, _> =
305 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
306 expect_test::expect![[r#"
307 Err(
308 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
309 )
310 "#]]
311 .assert_debug_eq(&foo21);
312 let foo22: Result<FlattenBoth, _> =
314 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
315 expect_test::expect![[r#"
316 Err(
317 Error("unknown field `unknown`", line: 1, column: 27),
318 )
319 "#]]
320 .assert_debug_eq(&foo22);
321 }
322
323 #[test]
324 fn test_inner_deny() {
325 #[derive(Deserialize, Debug)]
327 struct FlattenStruct {
328 #[serde(flatten)]
329 flatten_struct: Inner,
330 }
331 #[derive(Deserialize, Debug)]
332 #[serde(deny_unknown_fields)]
333 struct Inner {
334 a: Option<String>,
335 b: Option<String>,
336 }
337
338 let json = r#"{
339 "a": "b", "unknown":1
340 }"#;
341 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
342 expect_test::expect![[r#"
346 Ok(
347 FlattenStruct {
348 flatten_struct: Inner {
349 a: Some(
350 "b",
351 ),
352 b: None,
353 },
354 },
355 )
356 "#]]
357 .assert_debug_eq(&foo);
358 }
359
360 #[test]
361 fn test_multiple_flatten() {
362 #[derive(Deserialize, Debug)]
363 struct Foo {
364 #[serde(flatten)]
366 flatten_struct: Inner1,
367
368 #[serde(flatten)]
370 flatten_map1: BTreeMap<String, String>,
371
372 #[serde(flatten)]
373 flatten_map2: BTreeMap<String, String>,
374
375 #[serde(flatten)]
376 flatten_struct2: Inner2,
377 }
378
379 #[derive(Deserialize, Debug)]
380 #[serde(deny_unknown_fields)]
381 struct Inner1 {
382 a: Option<String>,
383 b: Option<String>,
384 }
385 #[derive(Deserialize, Debug)]
386 struct Inner11 {
387 c: Option<String>,
388 }
389 #[derive(Deserialize, Debug)]
390 #[serde(deny_unknown_fields)]
391 struct Inner2 {
392 c: Option<String>,
393 }
394
395 let json = r#"{
396 "a": "b", "c":"d"
397 }"#;
398 let foo2: Result<Foo, _> = serde_json::from_str(json);
399
400 expect![[r#"
403 Ok(
404 Foo {
405 flatten_struct: Inner1 {
406 a: Some(
407 "b",
408 ),
409 b: None,
410 },
411 flatten_map1: {
412 "c": "d",
413 },
414 flatten_map2: {
415 "c": "d",
416 },
417 flatten_struct2: Inner2 {
418 c: Some(
419 "d",
420 ),
421 },
422 },
423 )
424 "#]]
425 .assert_debug_eq(&foo2);
426 }
427
428 #[test]
429 fn test_nested_flatten() {
430 #[derive(Deserialize, Debug)]
431 #[serde(deny_unknown_fields)]
432 struct Outer {
433 #[serde(flatten)]
434 inner: Inner,
435 }
436
437 #[derive(Deserialize, Debug)]
438 struct Inner {
439 a: Option<String>,
440 b: Option<String>,
441 #[serde(flatten)]
442 nested: InnerInner,
443 }
444
445 #[derive(Deserialize, Debug)]
446 struct InnerInner {
447 c: Option<String>,
448 }
449
450 let json = r#"{ "a": "b", "unknown":"1" }"#;
451
452 let foo: Result<Outer, _> = serde_json::from_str(json);
453
454 expect_test::expect![[r#"
456 Err(
457 Error("unknown field `a`", line: 1, column: 27),
458 )
459 "#]]
460 .assert_debug_eq(&foo);
461
462 #[derive(Deserialize, Debug)]
465 struct Outer2 {
466 #[serde(flatten)]
467 inner: Inner,
468 #[serde(flatten)]
470 map: BTreeMap<String, String>,
471 }
472 let foo2: Result<Outer2, _> = serde_json::from_str(json);
473 expect_test::expect![[r#"
474 Ok(
475 Outer2 {
476 inner: Inner {
477 a: Some(
478 "b",
479 ),
480 b: None,
481 nested: InnerInner {
482 c: None,
483 },
484 },
485 map: {
486 "a": "b",
487 "unknown": "1",
488 },
489 },
490 )
491 "#]]
492 .assert_debug_eq(&foo2);
493 }
494
495 #[test]
496 fn test_flatten_option() {
497 #[derive(Deserialize, Debug)]
498 struct Foo {
499 #[serde(flatten)]
501 flatten_struct: Option<Inner1>,
502
503 #[serde(flatten)]
505 flatten_map1: Option<BTreeMap<String, String>>,
506
507 #[serde(flatten)]
509 flatten_struct2: Option<Inner2>,
510
511 #[serde(flatten)]
514 flatten_struct3: Option<Inner3>,
515 }
516
517 #[derive(Deserialize, Debug)]
518 struct Inner1 {
519 a: Option<String>,
520 b: Option<String>,
521 }
522 #[derive(Deserialize, Debug)]
523 struct Inner11 {
524 c: Option<String>,
525 }
526
527 #[derive(Deserialize, Debug)]
528 struct Inner2 {
529 c: Option<String>,
530 d: String,
531 }
532
533 #[derive(Deserialize, Debug)]
534 struct Inner3 {
535 e: Option<String>,
536 f: String,
537 }
538
539 let json = r#"{
540 "a": "b", "c": "d", "f": "g"
541 }"#;
542 let foo: Result<Foo, _> = serde_json::from_str(json);
543 expect![[r#"
544 Ok(
545 Foo {
546 flatten_struct: Some(
547 Inner1 {
548 a: Some(
549 "b",
550 ),
551 b: None,
552 },
553 ),
554 flatten_map1: Some(
555 {
556 "c": "d",
557 "f": "g",
558 },
559 ),
560 flatten_struct2: None,
561 flatten_struct3: Some(
562 Inner3 {
563 e: None,
564 f: "g",
565 },
566 ),
567 },
568 )
569 "#]]
570 .assert_debug_eq(&foo);
571 }
572 }
573}