1#![expect(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(type_alias_impl_trait)]
22#![feature(associated_type_defaults)]
23#![feature(iter_from_coroutine)]
24#![feature(iterator_try_collect)]
25#![feature(try_blocks)]
26#![feature(error_generic_member_access)]
27#![feature(negative_impls)]
28#![feature(register_tool)]
29#![feature(never_type)]
30#![feature(map_try_insert)]
31#![register_tool(rw)]
32#![recursion_limit = "256"]
33#![feature(min_specialization)]
34#![feature(custom_inner_attributes)]
35#![feature(iter_array_chunks)]
36
37use std::time::Duration;
38
39use duration_str::parse_std;
40use serde::de;
41
42pub mod aws_utils;
43
44pub mod allow_alter_on_fly_fields;
45
46mod enforce_secret;
47pub mod error;
48mod macros;
49
50pub mod parser;
51pub mod schema;
52pub mod sink;
53pub mod source;
54
55pub mod connector_common;
56
57pub use paste::paste;
58pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
59
60mod with_options;
61pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
62
63#[cfg(test)]
64mod with_options_test;
65
66pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
67pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
68pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
69pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
70
71pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
72where
73 D: de::Deserializer<'de>,
74{
75 let s: String = de::Deserialize::deserialize(deserializer)?;
76 s.parse().map_err(|_| {
77 de::Error::invalid_value(
78 de::Unexpected::Str(&s),
79 &"integer greater than or equal to 0",
80 )
81 })
82}
83
84pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
85 deserializer: D,
86) -> std::result::Result<Option<Vec<String>>, D::Error>
87where
88 D: de::Deserializer<'de>,
89{
90 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
91 if let Some(s) = s {
92 let s = s.to_ascii_lowercase();
93 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
94 Ok(Some(s))
95 } else {
96 Ok(None)
97 }
98}
99
100pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
101 deserializer: D,
102) -> std::result::Result<Option<Vec<u64>>, D::Error>
103where
104 D: de::Deserializer<'de>,
105{
106 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
107 if let Some(s) = s {
108 let numbers = s
109 .split(',')
110 .map(|s| s.trim().parse())
111 .collect::<Result<Vec<u64>, _>>()
112 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
113 Ok(Some(numbers?))
114 } else {
115 Ok(None)
116 }
117}
118
119pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
120where
121 D: de::Deserializer<'de>,
122{
123 let s: String = de::Deserialize::deserialize(deserializer)?;
124 let s = s.to_ascii_lowercase();
125 match s.as_str() {
126 "true" => Ok(true),
127 "false" => Ok(false),
128 _ => Err(de::Error::invalid_value(
129 de::Unexpected::Str(&s),
130 &"true or false",
131 )),
132 }
133}
134
135pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
136 deserializer: D,
137) -> std::result::Result<Option<bool>, D::Error>
138where
139 D: de::Deserializer<'de>,
140{
141 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
142 if let Some(s) = s {
143 let s = s.to_ascii_lowercase();
144 match s.as_str() {
145 "true" => Ok(Some(true)),
146 "false" => Ok(Some(false)),
147 _ => Err(de::Error::invalid_value(
148 de::Unexpected::Str(&s),
149 &"true or false",
150 )),
151 }
152 } else {
153 Ok(None)
154 }
155}
156
157pub(crate) fn deserialize_duration_from_string<'de, D>(
158 deserializer: D,
159) -> Result<Duration, D::Error>
160where
161 D: de::Deserializer<'de>,
162{
163 let s: String = de::Deserialize::deserialize(deserializer)?;
164 parse_std(&s).map_err(|_| de::Error::invalid_value(
165 de::Unexpected::Str(&s),
166 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
167 ))
168}
169
170pub(crate) fn deserialize_optional_duration_from_string<'de, D>(
171 deserializer: D,
172) -> Result<Option<Duration>, D::Error>
173where
174 D: de::Deserializer<'de>,
175{
176 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
177 if let Some(s) = s {
178 let duration = parse_std(&s).map_err(|_| de::Error::invalid_value(
179 de::Unexpected::Str(&s),
180 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
181 ))?;
182 Ok(Some(duration))
183 } else {
184 Ok(None)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use expect_test::expect_file;
191
192 use crate::with_options_test::{
193 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
194 generate_with_options_yaml_sink, generate_with_options_yaml_source,
195 };
196
197 #[test]
201 fn test_with_options_yaml_up_to_date() {
202 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
203
204 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
205
206 expect_file!("../with_options_connection.yaml")
207 .assert_eq(&generate_with_options_yaml_connection());
208 }
209
210 #[test]
212 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
213 expect_file!("../src/allow_alter_on_fly_fields.rs")
214 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
215 }
216
217 mod serde {
219 #![expect(dead_code)]
220
221 use std::collections::BTreeMap;
222
223 use expect_test::expect;
224 use serde::Deserialize;
225
226 #[test]
234 fn test_outer_deny() {
235 #[derive(Deserialize, Debug)]
236 #[serde(deny_unknown_fields)]
237 struct FlattenMap {
238 #[serde(flatten)]
239 flatten: BTreeMap<String, String>,
240 }
241 #[derive(Deserialize, Debug)]
242 #[serde(deny_unknown_fields)]
243 struct FlattenStruct {
244 #[serde(flatten)]
245 flatten_struct: Inner,
246 }
247
248 #[derive(Deserialize, Debug)]
249 #[serde(deny_unknown_fields)]
250 struct FlattenBoth {
251 #[serde(flatten)]
252 flatten: BTreeMap<String, String>,
253 #[serde(flatten)]
254 flatten_struct: Inner,
255 }
256
257 #[derive(Deserialize, Debug)]
258 struct Inner {
259 a: Option<String>,
260 b: Option<String>,
261 }
262
263 let json = r#"{
264 "a": "b"
265 }"#;
266 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
267 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
268 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
269
270 expect![[r#"
272 Err(
273 Error("unknown field `a`", line: 3, column: 13),
274 )
275 "#]]
276 .assert_debug_eq(&foo);
277
278 expect![[r#"
280 Ok(
281 FlattenStruct {
282 flatten_struct: Inner {
283 a: Some(
284 "b",
285 ),
286 b: None,
287 },
288 },
289 )
290 "#]]
291 .assert_debug_eq(&foo1);
292 let foo11: Result<FlattenStruct, _> =
294 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
295 expect_test::expect![[r#"
296 Err(
297 Error("unknown field `unknown`", line: 1, column: 25),
298 )
299 "#]]
300 .assert_debug_eq(&foo11);
301
302 expect![[r#"
304 Ok(
305 FlattenBoth {
306 flatten: {
307 "a": "b",
308 },
309 flatten_struct: Inner {
310 a: Some(
311 "b",
312 ),
313 b: None,
314 },
315 },
316 )
317 "#]]
318 .assert_debug_eq(&foo2);
319
320 let foo21: Result<FlattenBoth, _> =
321 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
322 expect_test::expect![[r#"
323 Err(
324 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
325 )
326 "#]]
327 .assert_debug_eq(&foo21);
328 let foo22: Result<FlattenBoth, _> =
330 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
331 expect_test::expect![[r#"
332 Err(
333 Error("unknown field `unknown`", line: 1, column: 27),
334 )
335 "#]]
336 .assert_debug_eq(&foo22);
337 }
338
339 #[test]
340 fn test_inner_deny() {
341 #[derive(Deserialize, Debug)]
343 struct FlattenStruct {
344 #[serde(flatten)]
345 flatten_struct: Inner,
346 }
347 #[derive(Deserialize, Debug)]
348 #[serde(deny_unknown_fields)]
349 struct Inner {
350 a: Option<String>,
351 b: Option<String>,
352 }
353
354 let json = r#"{
355 "a": "b", "unknown":1
356 }"#;
357 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
358 expect_test::expect![[r#"
362 Ok(
363 FlattenStruct {
364 flatten_struct: Inner {
365 a: Some(
366 "b",
367 ),
368 b: None,
369 },
370 },
371 )
372 "#]]
373 .assert_debug_eq(&foo);
374 }
375
376 #[test]
377 fn test_multiple_flatten() {
378 #[derive(Deserialize, Debug)]
379 struct Foo {
380 #[serde(flatten)]
382 flatten_struct: Inner1,
383
384 #[serde(flatten)]
386 flatten_map1: BTreeMap<String, String>,
387
388 #[serde(flatten)]
389 flatten_map2: BTreeMap<String, String>,
390
391 #[serde(flatten)]
392 flatten_struct2: Inner2,
393 }
394
395 #[derive(Deserialize, Debug)]
396 #[serde(deny_unknown_fields)]
397 struct Inner1 {
398 a: Option<String>,
399 b: Option<String>,
400 }
401 #[derive(Deserialize, Debug)]
402 struct Inner11 {
403 c: Option<String>,
404 }
405 #[derive(Deserialize, Debug)]
406 #[serde(deny_unknown_fields)]
407 struct Inner2 {
408 c: Option<String>,
409 }
410
411 let json = r#"{
412 "a": "b", "c":"d"
413 }"#;
414 let foo2: Result<Foo, _> = serde_json::from_str(json);
415
416 expect![[r#"
419 Ok(
420 Foo {
421 flatten_struct: Inner1 {
422 a: Some(
423 "b",
424 ),
425 b: None,
426 },
427 flatten_map1: {
428 "c": "d",
429 },
430 flatten_map2: {
431 "c": "d",
432 },
433 flatten_struct2: Inner2 {
434 c: Some(
435 "d",
436 ),
437 },
438 },
439 )
440 "#]]
441 .assert_debug_eq(&foo2);
442 }
443
444 #[test]
445 fn test_nested_flatten() {
446 #[derive(Deserialize, Debug)]
447 #[serde(deny_unknown_fields)]
448 struct Outer {
449 #[serde(flatten)]
450 inner: Inner,
451 }
452
453 #[derive(Deserialize, Debug)]
454 struct Inner {
455 a: Option<String>,
456 b: Option<String>,
457 #[serde(flatten)]
458 nested: InnerInner,
459 }
460
461 #[derive(Deserialize, Debug)]
462 struct InnerInner {
463 c: Option<String>,
464 }
465
466 let json = r#"{ "a": "b", "unknown":"1" }"#;
467
468 let foo: Result<Outer, _> = serde_json::from_str(json);
469
470 expect_test::expect![[r#"
472 Err(
473 Error("unknown field `a`", line: 1, column: 27),
474 )
475 "#]]
476 .assert_debug_eq(&foo);
477
478 #[derive(Deserialize, Debug)]
481 struct Outer2 {
482 #[serde(flatten)]
483 inner: Inner,
484 #[serde(flatten)]
486 map: BTreeMap<String, String>,
487 }
488 let foo2: Result<Outer2, _> = serde_json::from_str(json);
489 expect_test::expect![[r#"
490 Ok(
491 Outer2 {
492 inner: Inner {
493 a: Some(
494 "b",
495 ),
496 b: None,
497 nested: InnerInner {
498 c: None,
499 },
500 },
501 map: {
502 "a": "b",
503 "unknown": "1",
504 },
505 },
506 )
507 "#]]
508 .assert_debug_eq(&foo2);
509 }
510
511 #[test]
512 fn test_flatten_option() {
513 #[derive(Deserialize, Debug)]
514 struct Foo {
515 #[serde(flatten)]
517 flatten_struct: Option<Inner1>,
518
519 #[serde(flatten)]
521 flatten_map1: Option<BTreeMap<String, String>>,
522
523 #[serde(flatten)]
525 flatten_struct2: Option<Inner2>,
526
527 #[serde(flatten)]
530 flatten_struct3: Option<Inner3>,
531 }
532
533 #[derive(Deserialize, Debug)]
534 struct Inner1 {
535 a: Option<String>,
536 b: Option<String>,
537 }
538 #[derive(Deserialize, Debug)]
539 struct Inner11 {
540 c: Option<String>,
541 }
542
543 #[derive(Deserialize, Debug)]
544 struct Inner2 {
545 c: Option<String>,
546 d: String,
547 }
548
549 #[derive(Deserialize, Debug)]
550 struct Inner3 {
551 e: Option<String>,
552 f: String,
553 }
554
555 let json = r#"{
556 "a": "b", "c": "d", "f": "g"
557 }"#;
558 let foo: Result<Foo, _> = serde_json::from_str(json);
559 expect![[r#"
560 Ok(
561 Foo {
562 flatten_struct: Some(
563 Inner1 {
564 a: Some(
565 "b",
566 ),
567 b: None,
568 },
569 ),
570 flatten_map1: Some(
571 {
572 "c": "d",
573 "f": "g",
574 },
575 ),
576 flatten_struct2: None,
577 flatten_struct3: Some(
578 Inner3 {
579 e: None,
580 f: "g",
581 },
582 ),
583 },
584 )
585 "#]]
586 .assert_debug_eq(&foo);
587 }
588 }
589}