1#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(array_chunks)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene)]
20#![feature(stmt_expr_attributes)]
21#![feature(box_patterns)]
22#![feature(trait_alias)]
23#![feature(let_chains)]
24#![feature(box_into_inner)]
25#![feature(type_alias_impl_trait)]
26#![feature(associated_type_defaults)]
27#![feature(impl_trait_in_assoc_type)]
28#![feature(iter_from_coroutine)]
29#![feature(if_let_guard)]
30#![feature(iterator_try_collect)]
31#![feature(try_blocks)]
32#![feature(error_generic_member_access)]
33#![feature(negative_impls)]
34#![feature(register_tool)]
35#![feature(assert_matches)]
36#![feature(never_type)]
37#![feature(map_try_insert)]
38#![register_tool(rw)]
39#![recursion_limit = "256"]
40#![feature(min_specialization)]
41#![feature(custom_inner_attributes)]
42
43use std::time::Duration;
44
45use duration_str::parse_std;
46use serde::de;
47
48pub mod aws_utils;
49
50#[rustfmt::skip]
51pub mod allow_alter_on_fly_fields;
52
53mod enforce_secret;
54pub mod error;
55mod macros;
56
57pub mod parser;
58pub mod schema;
59pub mod sink;
60pub mod source;
61
62pub mod connector_common;
63
64pub use paste::paste;
65pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
66
67mod with_options;
68pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
69
70#[cfg(test)]
71mod with_options_test;
72
73pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
74pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
75pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
76pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
77
78pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
79where
80 D: de::Deserializer<'de>,
81{
82 let s: String = de::Deserialize::deserialize(deserializer)?;
83 s.parse().map_err(|_| {
84 de::Error::invalid_value(
85 de::Unexpected::Str(&s),
86 &"integer greater than or equal to 0",
87 )
88 })
89}
90
91pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
92 deserializer: D,
93) -> std::result::Result<Option<Vec<String>>, D::Error>
94where
95 D: de::Deserializer<'de>,
96{
97 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
98 if let Some(s) = s {
99 let s = s.to_ascii_lowercase();
100 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
101 Ok(Some(s))
102 } else {
103 Ok(None)
104 }
105}
106
107pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
108 deserializer: D,
109) -> std::result::Result<Option<Vec<u64>>, D::Error>
110where
111 D: de::Deserializer<'de>,
112{
113 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
114 if let Some(s) = s {
115 let numbers = s
116 .split(',')
117 .map(|s| s.trim().parse())
118 .collect::<Result<Vec<u64>, _>>()
119 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
120 Ok(Some(numbers?))
121 } else {
122 Ok(None)
123 }
124}
125
126pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
127where
128 D: de::Deserializer<'de>,
129{
130 let s: String = de::Deserialize::deserialize(deserializer)?;
131 let s = s.to_ascii_lowercase();
132 match s.as_str() {
133 "true" => Ok(true),
134 "false" => Ok(false),
135 _ => Err(de::Error::invalid_value(
136 de::Unexpected::Str(&s),
137 &"true or false",
138 )),
139 }
140}
141
142pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
143 deserializer: D,
144) -> std::result::Result<Option<bool>, D::Error>
145where
146 D: de::Deserializer<'de>,
147{
148 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
149 if let Some(s) = s {
150 let s = s.to_ascii_lowercase();
151 match s.as_str() {
152 "true" => Ok(Some(true)),
153 "false" => Ok(Some(false)),
154 _ => Err(de::Error::invalid_value(
155 de::Unexpected::Str(&s),
156 &"true or false",
157 )),
158 }
159 } else {
160 Ok(None)
161 }
162}
163
164pub(crate) fn deserialize_duration_from_string<'de, D>(
165 deserializer: D,
166) -> Result<Duration, D::Error>
167where
168 D: de::Deserializer<'de>,
169{
170 let s: String = de::Deserialize::deserialize(deserializer)?;
171 parse_std(&s).map_err(|_| de::Error::invalid_value(
172 de::Unexpected::Str(&s),
173 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
174 ))
175}
176
177#[cfg(test)]
178mod tests {
179 use expect_test::expect_file;
180
181 use crate::with_options_test::{
182 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
183 generate_with_options_yaml_sink, generate_with_options_yaml_source,
184 };
185
186 #[test]
190 fn test_with_options_yaml_up_to_date() {
191 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
192
193 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
194
195 expect_file!("../with_options_connection.yaml")
196 .assert_eq(&generate_with_options_yaml_connection());
197 }
198
199 #[test]
201 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
202 expect_file!("../src/allow_alter_on_fly_fields.rs")
203 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
204 }
205
206 mod serde {
208 #![expect(dead_code)]
209
210 use std::collections::BTreeMap;
211
212 use expect_test::expect;
213 use serde::Deserialize;
214
215 #[test]
223 fn test_outer_deny() {
224 #[derive(Deserialize, Debug)]
225 #[serde(deny_unknown_fields)]
226 struct FlattenMap {
227 #[serde(flatten)]
228 flatten: BTreeMap<String, String>,
229 }
230 #[derive(Deserialize, Debug)]
231 #[serde(deny_unknown_fields)]
232 struct FlattenStruct {
233 #[serde(flatten)]
234 flatten_struct: Inner,
235 }
236
237 #[derive(Deserialize, Debug)]
238 #[serde(deny_unknown_fields)]
239 struct FlattenBoth {
240 #[serde(flatten)]
241 flatten: BTreeMap<String, String>,
242 #[serde(flatten)]
243 flatten_struct: Inner,
244 }
245
246 #[derive(Deserialize, Debug)]
247 struct Inner {
248 a: Option<String>,
249 b: Option<String>,
250 }
251
252 let json = r#"{
253 "a": "b"
254 }"#;
255 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
256 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
257 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
258
259 expect![[r#"
261 Err(
262 Error("unknown field `a`", line: 3, column: 13),
263 )
264 "#]]
265 .assert_debug_eq(&foo);
266
267 expect![[r#"
269 Ok(
270 FlattenStruct {
271 flatten_struct: Inner {
272 a: Some(
273 "b",
274 ),
275 b: None,
276 },
277 },
278 )
279 "#]]
280 .assert_debug_eq(&foo1);
281 let foo11: Result<FlattenStruct, _> =
283 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
284 expect_test::expect![[r#"
285 Err(
286 Error("unknown field `unknown`", line: 1, column: 25),
287 )
288 "#]]
289 .assert_debug_eq(&foo11);
290
291 expect![[r#"
293 Ok(
294 FlattenBoth {
295 flatten: {
296 "a": "b",
297 },
298 flatten_struct: Inner {
299 a: Some(
300 "b",
301 ),
302 b: None,
303 },
304 },
305 )
306 "#]]
307 .assert_debug_eq(&foo2);
308
309 let foo21: Result<FlattenBoth, _> =
310 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
311 expect_test::expect![[r#"
312 Err(
313 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
314 )
315 "#]]
316 .assert_debug_eq(&foo21);
317 let foo22: Result<FlattenBoth, _> =
319 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
320 expect_test::expect![[r#"
321 Err(
322 Error("unknown field `unknown`", line: 1, column: 27),
323 )
324 "#]]
325 .assert_debug_eq(&foo22);
326 }
327
328 #[test]
329 fn test_inner_deny() {
330 #[derive(Deserialize, Debug)]
332 struct FlattenStruct {
333 #[serde(flatten)]
334 flatten_struct: Inner,
335 }
336 #[derive(Deserialize, Debug)]
337 #[serde(deny_unknown_fields)]
338 struct Inner {
339 a: Option<String>,
340 b: Option<String>,
341 }
342
343 let json = r#"{
344 "a": "b", "unknown":1
345 }"#;
346 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
347 expect_test::expect![[r#"
351 Ok(
352 FlattenStruct {
353 flatten_struct: Inner {
354 a: Some(
355 "b",
356 ),
357 b: None,
358 },
359 },
360 )
361 "#]]
362 .assert_debug_eq(&foo);
363 }
364
365 #[test]
366 fn test_multiple_flatten() {
367 #[derive(Deserialize, Debug)]
368 struct Foo {
369 #[serde(flatten)]
371 flatten_struct: Inner1,
372
373 #[serde(flatten)]
375 flatten_map1: BTreeMap<String, String>,
376
377 #[serde(flatten)]
378 flatten_map2: BTreeMap<String, String>,
379
380 #[serde(flatten)]
381 flatten_struct2: Inner2,
382 }
383
384 #[derive(Deserialize, Debug)]
385 #[serde(deny_unknown_fields)]
386 struct Inner1 {
387 a: Option<String>,
388 b: Option<String>,
389 }
390 #[derive(Deserialize, Debug)]
391 struct Inner11 {
392 c: Option<String>,
393 }
394 #[derive(Deserialize, Debug)]
395 #[serde(deny_unknown_fields)]
396 struct Inner2 {
397 c: Option<String>,
398 }
399
400 let json = r#"{
401 "a": "b", "c":"d"
402 }"#;
403 let foo2: Result<Foo, _> = serde_json::from_str(json);
404
405 expect![[r#"
408 Ok(
409 Foo {
410 flatten_struct: Inner1 {
411 a: Some(
412 "b",
413 ),
414 b: None,
415 },
416 flatten_map1: {
417 "c": "d",
418 },
419 flatten_map2: {
420 "c": "d",
421 },
422 flatten_struct2: Inner2 {
423 c: Some(
424 "d",
425 ),
426 },
427 },
428 )
429 "#]]
430 .assert_debug_eq(&foo2);
431 }
432
433 #[test]
434 fn test_nested_flatten() {
435 #[derive(Deserialize, Debug)]
436 #[serde(deny_unknown_fields)]
437 struct Outer {
438 #[serde(flatten)]
439 inner: Inner,
440 }
441
442 #[derive(Deserialize, Debug)]
443 struct Inner {
444 a: Option<String>,
445 b: Option<String>,
446 #[serde(flatten)]
447 nested: InnerInner,
448 }
449
450 #[derive(Deserialize, Debug)]
451 struct InnerInner {
452 c: Option<String>,
453 }
454
455 let json = r#"{ "a": "b", "unknown":"1" }"#;
456
457 let foo: Result<Outer, _> = serde_json::from_str(json);
458
459 expect_test::expect![[r#"
461 Err(
462 Error("unknown field `a`", line: 1, column: 27),
463 )
464 "#]]
465 .assert_debug_eq(&foo);
466
467 #[derive(Deserialize, Debug)]
470 struct Outer2 {
471 #[serde(flatten)]
472 inner: Inner,
473 #[serde(flatten)]
475 map: BTreeMap<String, String>,
476 }
477 let foo2: Result<Outer2, _> = serde_json::from_str(json);
478 expect_test::expect![[r#"
479 Ok(
480 Outer2 {
481 inner: Inner {
482 a: Some(
483 "b",
484 ),
485 b: None,
486 nested: InnerInner {
487 c: None,
488 },
489 },
490 map: {
491 "a": "b",
492 "unknown": "1",
493 },
494 },
495 )
496 "#]]
497 .assert_debug_eq(&foo2);
498 }
499
500 #[test]
501 fn test_flatten_option() {
502 #[derive(Deserialize, Debug)]
503 struct Foo {
504 #[serde(flatten)]
506 flatten_struct: Option<Inner1>,
507
508 #[serde(flatten)]
510 flatten_map1: Option<BTreeMap<String, String>>,
511
512 #[serde(flatten)]
514 flatten_struct2: Option<Inner2>,
515
516 #[serde(flatten)]
519 flatten_struct3: Option<Inner3>,
520 }
521
522 #[derive(Deserialize, Debug)]
523 struct Inner1 {
524 a: Option<String>,
525 b: Option<String>,
526 }
527 #[derive(Deserialize, Debug)]
528 struct Inner11 {
529 c: Option<String>,
530 }
531
532 #[derive(Deserialize, Debug)]
533 struct Inner2 {
534 c: Option<String>,
535 d: String,
536 }
537
538 #[derive(Deserialize, Debug)]
539 struct Inner3 {
540 e: Option<String>,
541 f: String,
542 }
543
544 let json = r#"{
545 "a": "b", "c": "d", "f": "g"
546 }"#;
547 let foo: Result<Foo, _> = serde_json::from_str(json);
548 expect![[r#"
549 Ok(
550 Foo {
551 flatten_struct: Some(
552 Inner1 {
553 a: Some(
554 "b",
555 ),
556 b: None,
557 },
558 ),
559 flatten_map1: Some(
560 {
561 "c": "d",
562 "f": "g",
563 },
564 ),
565 flatten_struct2: None,
566 flatten_struct3: Some(
567 Inner3 {
568 e: None,
569 f: "g",
570 },
571 ),
572 },
573 )
574 "#]]
575 .assert_debug_eq(&foo);
576 }
577 }
578}