1#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(box_into_inner)]
23#![feature(type_alias_impl_trait)]
24#![feature(associated_type_defaults)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(iter_from_coroutine)]
27#![feature(if_let_guard)]
28#![feature(iterator_try_collect)]
29#![feature(try_blocks)]
30#![feature(error_generic_member_access)]
31#![feature(negative_impls)]
32#![feature(register_tool)]
33#![feature(assert_matches)]
34#![feature(never_type)]
35#![feature(map_try_insert)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39#![feature(custom_inner_attributes)]
40#![feature(iter_array_chunks)]
41
42use std::time::Duration;
43
44use duration_str::parse_std;
45use serde::de;
46
47pub mod aws_utils;
48
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
73pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
74pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
75
76pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
77where
78 D: de::Deserializer<'de>,
79{
80 let s: String = de::Deserialize::deserialize(deserializer)?;
81 s.parse().map_err(|_| {
82 de::Error::invalid_value(
83 de::Unexpected::Str(&s),
84 &"integer greater than or equal to 0",
85 )
86 })
87}
88
89pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
90 deserializer: D,
91) -> std::result::Result<Option<Vec<String>>, D::Error>
92where
93 D: de::Deserializer<'de>,
94{
95 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
96 if let Some(s) = s {
97 let s = s.to_ascii_lowercase();
98 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
99 Ok(Some(s))
100 } else {
101 Ok(None)
102 }
103}
104
105pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
106 deserializer: D,
107) -> std::result::Result<Option<Vec<u64>>, D::Error>
108where
109 D: de::Deserializer<'de>,
110{
111 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
112 if let Some(s) = s {
113 let numbers = s
114 .split(',')
115 .map(|s| s.trim().parse())
116 .collect::<Result<Vec<u64>, _>>()
117 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
118 Ok(Some(numbers?))
119 } else {
120 Ok(None)
121 }
122}
123
124pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
125where
126 D: de::Deserializer<'de>,
127{
128 let s: String = de::Deserialize::deserialize(deserializer)?;
129 let s = s.to_ascii_lowercase();
130 match s.as_str() {
131 "true" => Ok(true),
132 "false" => Ok(false),
133 _ => Err(de::Error::invalid_value(
134 de::Unexpected::Str(&s),
135 &"true or false",
136 )),
137 }
138}
139
140pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
141 deserializer: D,
142) -> std::result::Result<Option<bool>, D::Error>
143where
144 D: de::Deserializer<'de>,
145{
146 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
147 if let Some(s) = s {
148 let s = s.to_ascii_lowercase();
149 match s.as_str() {
150 "true" => Ok(Some(true)),
151 "false" => Ok(Some(false)),
152 _ => Err(de::Error::invalid_value(
153 de::Unexpected::Str(&s),
154 &"true or false",
155 )),
156 }
157 } else {
158 Ok(None)
159 }
160}
161
162pub(crate) fn deserialize_duration_from_string<'de, D>(
163 deserializer: D,
164) -> Result<Duration, D::Error>
165where
166 D: de::Deserializer<'de>,
167{
168 let s: String = de::Deserialize::deserialize(deserializer)?;
169 parse_std(&s).map_err(|_| de::Error::invalid_value(
170 de::Unexpected::Str(&s),
171 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
172 ))
173}
174
175pub(crate) fn deserialize_optional_duration_from_string<'de, D>(
176 deserializer: D,
177) -> Result<Option<Duration>, D::Error>
178where
179 D: de::Deserializer<'de>,
180{
181 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
182 if let Some(s) = s {
183 let duration = parse_std(&s).map_err(|_| de::Error::invalid_value(
184 de::Unexpected::Str(&s),
185 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
186 ))?;
187 Ok(Some(duration))
188 } else {
189 Ok(None)
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use expect_test::expect_file;
196
197 use crate::with_options_test::{
198 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
199 generate_with_options_yaml_sink, generate_with_options_yaml_source,
200 };
201
202 #[test]
206 fn test_with_options_yaml_up_to_date() {
207 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
208
209 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
210
211 expect_file!("../with_options_connection.yaml")
212 .assert_eq(&generate_with_options_yaml_connection());
213 }
214
215 #[test]
217 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
218 expect_file!("../src/allow_alter_on_fly_fields.rs")
219 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
220 }
221
222 mod serde {
224 #![expect(dead_code)]
225
226 use std::collections::BTreeMap;
227
228 use expect_test::expect;
229 use serde::Deserialize;
230
231 #[test]
239 fn test_outer_deny() {
240 #[derive(Deserialize, Debug)]
241 #[serde(deny_unknown_fields)]
242 struct FlattenMap {
243 #[serde(flatten)]
244 flatten: BTreeMap<String, String>,
245 }
246 #[derive(Deserialize, Debug)]
247 #[serde(deny_unknown_fields)]
248 struct FlattenStruct {
249 #[serde(flatten)]
250 flatten_struct: Inner,
251 }
252
253 #[derive(Deserialize, Debug)]
254 #[serde(deny_unknown_fields)]
255 struct FlattenBoth {
256 #[serde(flatten)]
257 flatten: BTreeMap<String, String>,
258 #[serde(flatten)]
259 flatten_struct: Inner,
260 }
261
262 #[derive(Deserialize, Debug)]
263 struct Inner {
264 a: Option<String>,
265 b: Option<String>,
266 }
267
268 let json = r#"{
269 "a": "b"
270 }"#;
271 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
272 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
273 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
274
275 expect![[r#"
277 Err(
278 Error("unknown field `a`", line: 3, column: 13),
279 )
280 "#]]
281 .assert_debug_eq(&foo);
282
283 expect![[r#"
285 Ok(
286 FlattenStruct {
287 flatten_struct: Inner {
288 a: Some(
289 "b",
290 ),
291 b: None,
292 },
293 },
294 )
295 "#]]
296 .assert_debug_eq(&foo1);
297 let foo11: Result<FlattenStruct, _> =
299 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
300 expect_test::expect![[r#"
301 Err(
302 Error("unknown field `unknown`", line: 1, column: 25),
303 )
304 "#]]
305 .assert_debug_eq(&foo11);
306
307 expect![[r#"
309 Ok(
310 FlattenBoth {
311 flatten: {
312 "a": "b",
313 },
314 flatten_struct: Inner {
315 a: Some(
316 "b",
317 ),
318 b: None,
319 },
320 },
321 )
322 "#]]
323 .assert_debug_eq(&foo2);
324
325 let foo21: Result<FlattenBoth, _> =
326 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
327 expect_test::expect![[r#"
328 Err(
329 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
330 )
331 "#]]
332 .assert_debug_eq(&foo21);
333 let foo22: Result<FlattenBoth, _> =
335 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
336 expect_test::expect![[r#"
337 Err(
338 Error("unknown field `unknown`", line: 1, column: 27),
339 )
340 "#]]
341 .assert_debug_eq(&foo22);
342 }
343
344 #[test]
345 fn test_inner_deny() {
346 #[derive(Deserialize, Debug)]
348 struct FlattenStruct {
349 #[serde(flatten)]
350 flatten_struct: Inner,
351 }
352 #[derive(Deserialize, Debug)]
353 #[serde(deny_unknown_fields)]
354 struct Inner {
355 a: Option<String>,
356 b: Option<String>,
357 }
358
359 let json = r#"{
360 "a": "b", "unknown":1
361 }"#;
362 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
363 expect_test::expect![[r#"
367 Ok(
368 FlattenStruct {
369 flatten_struct: Inner {
370 a: Some(
371 "b",
372 ),
373 b: None,
374 },
375 },
376 )
377 "#]]
378 .assert_debug_eq(&foo);
379 }
380
381 #[test]
382 fn test_multiple_flatten() {
383 #[derive(Deserialize, Debug)]
384 struct Foo {
385 #[serde(flatten)]
387 flatten_struct: Inner1,
388
389 #[serde(flatten)]
391 flatten_map1: BTreeMap<String, String>,
392
393 #[serde(flatten)]
394 flatten_map2: BTreeMap<String, String>,
395
396 #[serde(flatten)]
397 flatten_struct2: Inner2,
398 }
399
400 #[derive(Deserialize, Debug)]
401 #[serde(deny_unknown_fields)]
402 struct Inner1 {
403 a: Option<String>,
404 b: Option<String>,
405 }
406 #[derive(Deserialize, Debug)]
407 struct Inner11 {
408 c: Option<String>,
409 }
410 #[derive(Deserialize, Debug)]
411 #[serde(deny_unknown_fields)]
412 struct Inner2 {
413 c: Option<String>,
414 }
415
416 let json = r#"{
417 "a": "b", "c":"d"
418 }"#;
419 let foo2: Result<Foo, _> = serde_json::from_str(json);
420
421 expect![[r#"
424 Ok(
425 Foo {
426 flatten_struct: Inner1 {
427 a: Some(
428 "b",
429 ),
430 b: None,
431 },
432 flatten_map1: {
433 "c": "d",
434 },
435 flatten_map2: {
436 "c": "d",
437 },
438 flatten_struct2: Inner2 {
439 c: Some(
440 "d",
441 ),
442 },
443 },
444 )
445 "#]]
446 .assert_debug_eq(&foo2);
447 }
448
449 #[test]
450 fn test_nested_flatten() {
451 #[derive(Deserialize, Debug)]
452 #[serde(deny_unknown_fields)]
453 struct Outer {
454 #[serde(flatten)]
455 inner: Inner,
456 }
457
458 #[derive(Deserialize, Debug)]
459 struct Inner {
460 a: Option<String>,
461 b: Option<String>,
462 #[serde(flatten)]
463 nested: InnerInner,
464 }
465
466 #[derive(Deserialize, Debug)]
467 struct InnerInner {
468 c: Option<String>,
469 }
470
471 let json = r#"{ "a": "b", "unknown":"1" }"#;
472
473 let foo: Result<Outer, _> = serde_json::from_str(json);
474
475 expect_test::expect![[r#"
477 Err(
478 Error("unknown field `a`", line: 1, column: 27),
479 )
480 "#]]
481 .assert_debug_eq(&foo);
482
483 #[derive(Deserialize, Debug)]
486 struct Outer2 {
487 #[serde(flatten)]
488 inner: Inner,
489 #[serde(flatten)]
491 map: BTreeMap<String, String>,
492 }
493 let foo2: Result<Outer2, _> = serde_json::from_str(json);
494 expect_test::expect![[r#"
495 Ok(
496 Outer2 {
497 inner: Inner {
498 a: Some(
499 "b",
500 ),
501 b: None,
502 nested: InnerInner {
503 c: None,
504 },
505 },
506 map: {
507 "a": "b",
508 "unknown": "1",
509 },
510 },
511 )
512 "#]]
513 .assert_debug_eq(&foo2);
514 }
515
516 #[test]
517 fn test_flatten_option() {
518 #[derive(Deserialize, Debug)]
519 struct Foo {
520 #[serde(flatten)]
522 flatten_struct: Option<Inner1>,
523
524 #[serde(flatten)]
526 flatten_map1: Option<BTreeMap<String, String>>,
527
528 #[serde(flatten)]
530 flatten_struct2: Option<Inner2>,
531
532 #[serde(flatten)]
535 flatten_struct3: Option<Inner3>,
536 }
537
538 #[derive(Deserialize, Debug)]
539 struct Inner1 {
540 a: Option<String>,
541 b: Option<String>,
542 }
543 #[derive(Deserialize, Debug)]
544 struct Inner11 {
545 c: Option<String>,
546 }
547
548 #[derive(Deserialize, Debug)]
549 struct Inner2 {
550 c: Option<String>,
551 d: String,
552 }
553
554 #[derive(Deserialize, Debug)]
555 struct Inner3 {
556 e: Option<String>,
557 f: String,
558 }
559
560 let json = r#"{
561 "a": "b", "c": "d", "f": "g"
562 }"#;
563 let foo: Result<Foo, _> = serde_json::from_str(json);
564 expect![[r#"
565 Ok(
566 Foo {
567 flatten_struct: Some(
568 Inner1 {
569 a: Some(
570 "b",
571 ),
572 b: None,
573 },
574 ),
575 flatten_map1: Some(
576 {
577 "c": "d",
578 "f": "g",
579 },
580 ),
581 flatten_struct2: None,
582 flatten_struct3: Some(
583 Inner3 {
584 e: None,
585 f: "g",
586 },
587 ),
588 },
589 )
590 "#]]
591 .assert_debug_eq(&foo);
592 }
593 }
594}