1#![allow(clippy::derive_partial_eq_without_eq)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene)]
19#![feature(stmt_expr_attributes)]
20#![feature(box_patterns)]
21#![feature(trait_alias)]
22#![feature(box_into_inner)]
23#![feature(type_alias_impl_trait)]
24#![feature(associated_type_defaults)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(iter_from_coroutine)]
27#![feature(if_let_guard)]
28#![feature(iterator_try_collect)]
29#![feature(try_blocks)]
30#![feature(error_generic_member_access)]
31#![feature(negative_impls)]
32#![feature(register_tool)]
33#![feature(assert_matches)]
34#![feature(never_type)]
35#![feature(map_try_insert)]
36#![register_tool(rw)]
37#![recursion_limit = "256"]
38#![feature(min_specialization)]
39#![feature(custom_inner_attributes)]
40#![feature(iter_array_chunks)]
41
42use std::time::Duration;
43
44use duration_str::parse_std;
45use serde::de;
46
47pub mod aws_utils;
48
49pub mod allow_alter_on_fly_fields;
50
51mod enforce_secret;
52pub mod error;
53mod macros;
54
55pub mod parser;
56pub mod schema;
57pub mod sink;
58pub mod source;
59
60pub mod connector_common;
61
62pub use paste::paste;
63pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime};
64
65mod with_options;
66pub use with_options::{Get, GetKeyIter, WithOptionsSecResolved, WithPropertiesExt};
67
68#[cfg(test)]
69mod with_options_test;
70
71pub const AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
72pub const SINK_CREATE_TABLE_IF_NOT_EXISTS_KEY: &str = "create_table_if_not_exists";
73pub const SINK_TARGET_TABLE_NAME: &str = "table.name";
74pub const SINK_INTERMEDIATE_TABLE_NAME: &str = "intermediate.table.name";
75
76pub(crate) fn deserialize_u32_from_string<'de, D>(deserializer: D) -> Result<u32, D::Error>
77where
78 D: de::Deserializer<'de>,
79{
80 let s: String = de::Deserialize::deserialize(deserializer)?;
81 s.parse().map_err(|_| {
82 de::Error::invalid_value(
83 de::Unexpected::Str(&s),
84 &"integer greater than or equal to 0",
85 )
86 })
87}
88
89pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
90 deserializer: D,
91) -> std::result::Result<Option<Vec<String>>, D::Error>
92where
93 D: de::Deserializer<'de>,
94{
95 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
96 if let Some(s) = s {
97 let s = s.to_ascii_lowercase();
98 let s = s.split(',').map(|s| s.trim().to_owned()).collect();
99 Ok(Some(s))
100 } else {
101 Ok(None)
102 }
103}
104
105pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>(
106 deserializer: D,
107) -> std::result::Result<Option<Vec<u64>>, D::Error>
108where
109 D: de::Deserializer<'de>,
110{
111 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
112 if let Some(s) = s {
113 let numbers = s
114 .split(',')
115 .map(|s| s.trim().parse())
116 .collect::<Result<Vec<u64>, _>>()
117 .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number"));
118 Ok(Some(numbers?))
119 } else {
120 Ok(None)
121 }
122}
123
124pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
125where
126 D: de::Deserializer<'de>,
127{
128 let s: String = de::Deserialize::deserialize(deserializer)?;
129 let s = s.to_ascii_lowercase();
130 match s.as_str() {
131 "true" => Ok(true),
132 "false" => Ok(false),
133 _ => Err(de::Error::invalid_value(
134 de::Unexpected::Str(&s),
135 &"true or false",
136 )),
137 }
138}
139
140pub(crate) fn deserialize_optional_bool_from_string<'de, D>(
141 deserializer: D,
142) -> std::result::Result<Option<bool>, D::Error>
143where
144 D: de::Deserializer<'de>,
145{
146 let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
147 if let Some(s) = s {
148 let s = s.to_ascii_lowercase();
149 match s.as_str() {
150 "true" => Ok(Some(true)),
151 "false" => Ok(Some(false)),
152 _ => Err(de::Error::invalid_value(
153 de::Unexpected::Str(&s),
154 &"true or false",
155 )),
156 }
157 } else {
158 Ok(None)
159 }
160}
161
162pub(crate) fn deserialize_duration_from_string<'de, D>(
163 deserializer: D,
164) -> Result<Duration, D::Error>
165where
166 D: de::Deserializer<'de>,
167{
168 let s: String = de::Deserialize::deserialize(deserializer)?;
169 parse_std(&s).map_err(|_| de::Error::invalid_value(
170 de::Unexpected::Str(&s),
171 &"The String value unit support for one of:[“y”,“mon”,“w”,“d”,“h”,“m”,“s”, “ms”, “µs”, “ns”]",
172 ))
173}
174
175#[cfg(test)]
176mod tests {
177 use expect_test::expect_file;
178
179 use crate::with_options_test::{
180 generate_allow_alter_on_fly_fields_combined, generate_with_options_yaml_connection,
181 generate_with_options_yaml_sink, generate_with_options_yaml_source,
182 };
183
184 #[test]
188 fn test_with_options_yaml_up_to_date() {
189 expect_file!("../with_options_source.yaml").assert_eq(&generate_with_options_yaml_source());
190
191 expect_file!("../with_options_sink.yaml").assert_eq(&generate_with_options_yaml_sink());
192
193 expect_file!("../with_options_connection.yaml")
194 .assert_eq(&generate_with_options_yaml_connection());
195 }
196
197 #[test]
199 fn test_allow_alter_on_fly_fields_rust_up_to_date() {
200 expect_file!("../src/allow_alter_on_fly_fields.rs")
201 .assert_eq(&generate_allow_alter_on_fly_fields_combined());
202 }
203
204 mod serde {
206 #![expect(dead_code)]
207
208 use std::collections::BTreeMap;
209
210 use expect_test::expect;
211 use serde::Deserialize;
212
213 #[test]
221 fn test_outer_deny() {
222 #[derive(Deserialize, Debug)]
223 #[serde(deny_unknown_fields)]
224 struct FlattenMap {
225 #[serde(flatten)]
226 flatten: BTreeMap<String, String>,
227 }
228 #[derive(Deserialize, Debug)]
229 #[serde(deny_unknown_fields)]
230 struct FlattenStruct {
231 #[serde(flatten)]
232 flatten_struct: Inner,
233 }
234
235 #[derive(Deserialize, Debug)]
236 #[serde(deny_unknown_fields)]
237 struct FlattenBoth {
238 #[serde(flatten)]
239 flatten: BTreeMap<String, String>,
240 #[serde(flatten)]
241 flatten_struct: Inner,
242 }
243
244 #[derive(Deserialize, Debug)]
245 struct Inner {
246 a: Option<String>,
247 b: Option<String>,
248 }
249
250 let json = r#"{
251 "a": "b"
252 }"#;
253 let foo: Result<FlattenMap, _> = serde_json::from_str(json);
254 let foo1: Result<FlattenStruct, _> = serde_json::from_str(json);
255 let foo2: Result<FlattenBoth, _> = serde_json::from_str(json);
256
257 expect![[r#"
259 Err(
260 Error("unknown field `a`", line: 3, column: 13),
261 )
262 "#]]
263 .assert_debug_eq(&foo);
264
265 expect![[r#"
267 Ok(
268 FlattenStruct {
269 flatten_struct: Inner {
270 a: Some(
271 "b",
272 ),
273 b: None,
274 },
275 },
276 )
277 "#]]
278 .assert_debug_eq(&foo1);
279 let foo11: Result<FlattenStruct, _> =
281 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
282 expect_test::expect![[r#"
283 Err(
284 Error("unknown field `unknown`", line: 1, column: 25),
285 )
286 "#]]
287 .assert_debug_eq(&foo11);
288
289 expect![[r#"
291 Ok(
292 FlattenBoth {
293 flatten: {
294 "a": "b",
295 },
296 flatten_struct: Inner {
297 a: Some(
298 "b",
299 ),
300 b: None,
301 },
302 },
303 )
304 "#]]
305 .assert_debug_eq(&foo2);
306
307 let foo21: Result<FlattenBoth, _> =
308 serde_json::from_str(r#"{ "a": "b", "unknown":1 }"#);
309 expect_test::expect![[r#"
310 Err(
311 Error("invalid type: integer `1`, expected a string", line: 1, column: 25),
312 )
313 "#]]
314 .assert_debug_eq(&foo21);
315 let foo22: Result<FlattenBoth, _> =
317 serde_json::from_str(r#"{ "a": "b", "unknown":"1" }"#);
318 expect_test::expect![[r#"
319 Err(
320 Error("unknown field `unknown`", line: 1, column: 27),
321 )
322 "#]]
323 .assert_debug_eq(&foo22);
324 }
325
326 #[test]
327 fn test_inner_deny() {
328 #[derive(Deserialize, Debug)]
330 struct FlattenStruct {
331 #[serde(flatten)]
332 flatten_struct: Inner,
333 }
334 #[derive(Deserialize, Debug)]
335 #[serde(deny_unknown_fields)]
336 struct Inner {
337 a: Option<String>,
338 b: Option<String>,
339 }
340
341 let json = r#"{
342 "a": "b", "unknown":1
343 }"#;
344 let foo: Result<FlattenStruct, _> = serde_json::from_str(json);
345 expect_test::expect![[r#"
349 Ok(
350 FlattenStruct {
351 flatten_struct: Inner {
352 a: Some(
353 "b",
354 ),
355 b: None,
356 },
357 },
358 )
359 "#]]
360 .assert_debug_eq(&foo);
361 }
362
363 #[test]
364 fn test_multiple_flatten() {
365 #[derive(Deserialize, Debug)]
366 struct Foo {
367 #[serde(flatten)]
369 flatten_struct: Inner1,
370
371 #[serde(flatten)]
373 flatten_map1: BTreeMap<String, String>,
374
375 #[serde(flatten)]
376 flatten_map2: BTreeMap<String, String>,
377
378 #[serde(flatten)]
379 flatten_struct2: Inner2,
380 }
381
382 #[derive(Deserialize, Debug)]
383 #[serde(deny_unknown_fields)]
384 struct Inner1 {
385 a: Option<String>,
386 b: Option<String>,
387 }
388 #[derive(Deserialize, Debug)]
389 struct Inner11 {
390 c: Option<String>,
391 }
392 #[derive(Deserialize, Debug)]
393 #[serde(deny_unknown_fields)]
394 struct Inner2 {
395 c: Option<String>,
396 }
397
398 let json = r#"{
399 "a": "b", "c":"d"
400 }"#;
401 let foo2: Result<Foo, _> = serde_json::from_str(json);
402
403 expect![[r#"
406 Ok(
407 Foo {
408 flatten_struct: Inner1 {
409 a: Some(
410 "b",
411 ),
412 b: None,
413 },
414 flatten_map1: {
415 "c": "d",
416 },
417 flatten_map2: {
418 "c": "d",
419 },
420 flatten_struct2: Inner2 {
421 c: Some(
422 "d",
423 ),
424 },
425 },
426 )
427 "#]]
428 .assert_debug_eq(&foo2);
429 }
430
431 #[test]
432 fn test_nested_flatten() {
433 #[derive(Deserialize, Debug)]
434 #[serde(deny_unknown_fields)]
435 struct Outer {
436 #[serde(flatten)]
437 inner: Inner,
438 }
439
440 #[derive(Deserialize, Debug)]
441 struct Inner {
442 a: Option<String>,
443 b: Option<String>,
444 #[serde(flatten)]
445 nested: InnerInner,
446 }
447
448 #[derive(Deserialize, Debug)]
449 struct InnerInner {
450 c: Option<String>,
451 }
452
453 let json = r#"{ "a": "b", "unknown":"1" }"#;
454
455 let foo: Result<Outer, _> = serde_json::from_str(json);
456
457 expect_test::expect![[r#"
459 Err(
460 Error("unknown field `a`", line: 1, column: 27),
461 )
462 "#]]
463 .assert_debug_eq(&foo);
464
465 #[derive(Deserialize, Debug)]
468 struct Outer2 {
469 #[serde(flatten)]
470 inner: Inner,
471 #[serde(flatten)]
473 map: BTreeMap<String, String>,
474 }
475 let foo2: Result<Outer2, _> = serde_json::from_str(json);
476 expect_test::expect![[r#"
477 Ok(
478 Outer2 {
479 inner: Inner {
480 a: Some(
481 "b",
482 ),
483 b: None,
484 nested: InnerInner {
485 c: None,
486 },
487 },
488 map: {
489 "a": "b",
490 "unknown": "1",
491 },
492 },
493 )
494 "#]]
495 .assert_debug_eq(&foo2);
496 }
497
498 #[test]
499 fn test_flatten_option() {
500 #[derive(Deserialize, Debug)]
501 struct Foo {
502 #[serde(flatten)]
504 flatten_struct: Option<Inner1>,
505
506 #[serde(flatten)]
508 flatten_map1: Option<BTreeMap<String, String>>,
509
510 #[serde(flatten)]
512 flatten_struct2: Option<Inner2>,
513
514 #[serde(flatten)]
517 flatten_struct3: Option<Inner3>,
518 }
519
520 #[derive(Deserialize, Debug)]
521 struct Inner1 {
522 a: Option<String>,
523 b: Option<String>,
524 }
525 #[derive(Deserialize, Debug)]
526 struct Inner11 {
527 c: Option<String>,
528 }
529
530 #[derive(Deserialize, Debug)]
531 struct Inner2 {
532 c: Option<String>,
533 d: String,
534 }
535
536 #[derive(Deserialize, Debug)]
537 struct Inner3 {
538 e: Option<String>,
539 f: String,
540 }
541
542 let json = r#"{
543 "a": "b", "c": "d", "f": "g"
544 }"#;
545 let foo: Result<Foo, _> = serde_json::from_str(json);
546 expect![[r#"
547 Ok(
548 Foo {
549 flatten_struct: Some(
550 Inner1 {
551 a: Some(
552 "b",
553 ),
554 b: None,
555 },
556 ),
557 flatten_map1: Some(
558 {
559 "c": "d",
560 "f": "g",
561 },
562 ),
563 flatten_struct2: None,
564 flatten_struct3: Some(
565 Inner3 {
566 e: None,
567 f: "g",
568 },
569 ),
570 },
571 )
572 "#]]
573 .assert_debug_eq(&foo);
574 }
575 }
576}