1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{Datelike, NaiveDateTime, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33 CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34 Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39 time_handling_mode: TimeHandlingMode,
40 date_handling_mode: DateHandlingMode,
41 timestamp_handling_mode: TimestampHandlingMode,
42 timestamptz_handling_mode: TimestamptzHandlingMode,
43 custom_json_type: CustomJsonType,
44 jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48 schema: Schema,
49 col_indices: Option<Vec<usize>>,
50 kafka_connect: Option<KafkaConnectParamsRef>,
51 config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55 pub fn new(
56 schema: Schema,
57 col_indices: Option<Vec<usize>>,
58 date_handling_mode: DateHandlingMode,
59 timestamp_handling_mode: TimestampHandlingMode,
60 timestamptz_handling_mode: TimestamptzHandlingMode,
61 time_handling_mode: TimeHandlingMode,
62 jsonb_handling_mode: JsonbHandlingMode,
63 ) -> Self {
64 let config = JsonEncoderConfig {
65 time_handling_mode,
66 date_handling_mode,
67 timestamp_handling_mode,
68 timestamptz_handling_mode,
69 custom_json_type: CustomJsonType::None,
70 jsonb_handling_mode,
71 };
72 Self {
73 schema,
74 col_indices,
75 kafka_connect: None,
76 config,
77 }
78 }
79
80 pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81 let config = JsonEncoderConfig {
82 time_handling_mode: TimeHandlingMode::String,
83 date_handling_mode: DateHandlingMode::String,
84 timestamp_handling_mode: TimestampHandlingMode::String,
85 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86 custom_json_type: CustomJsonType::Es,
87 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88 };
89 Self {
90 schema,
91 col_indices,
92 kafka_connect: None,
93 config,
94 }
95 }
96
97 pub fn new_with_doris(
98 schema: Schema,
99 col_indices: Option<Vec<usize>>,
100 map: HashMap<String, u8>,
101 ) -> Self {
102 let config = JsonEncoderConfig {
103 time_handling_mode: TimeHandlingMode::Milli,
104 date_handling_mode: DateHandlingMode::String,
105 timestamp_handling_mode: TimestampHandlingMode::String,
106 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107 custom_json_type: CustomJsonType::Doris(map),
108 jsonb_handling_mode: JsonbHandlingMode::String,
109 };
110 Self {
111 schema,
112 col_indices,
113 kafka_connect: None,
114 config,
115 }
116 }
117
118 pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119 let config = JsonEncoderConfig {
120 time_handling_mode: TimeHandlingMode::Milli,
121 date_handling_mode: DateHandlingMode::String,
122 timestamp_handling_mode: TimestampHandlingMode::String,
123 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124 custom_json_type: CustomJsonType::StarRocks,
125 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126 };
127 Self {
128 schema,
129 col_indices,
130 kafka_connect: None,
131 config,
132 }
133 }
134
135 pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136 Self {
137 kafka_connect: Some(Arc::new(kafka_connect)),
138 ..self
139 }
140 }
141}
142
143impl RowEncoder for JsonEncoder {
144 type Output = Map<String, Value>;
145
146 fn schema(&self) -> &Schema {
147 &self.schema
148 }
149
150 fn col_indices(&self) -> Option<&[usize]> {
151 self.col_indices.as_ref().map(Vec::as_ref)
152 }
153
154 fn encode_cols(
155 &self,
156 row: impl Row,
157 col_indices: impl Iterator<Item = usize>,
158 ) -> Result<Self::Output> {
159 let mut mappings = Map::with_capacity(self.schema.len());
160 let col_indices = col_indices.collect_vec();
161 for idx in &col_indices {
162 let field = &self.schema[*idx];
163 let key = field.name.clone();
164 let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165 .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166 mappings.insert(key, value);
167 }
168
169 Ok(if let Some(param) = &self.kafka_connect {
170 json_converter_with_schema(
171 Value::Object(mappings),
172 param.schema_name.to_owned(),
173 col_indices.into_iter().map(|i| &self.schema[i]),
174 )
175 } else {
176 mappings
177 })
178 }
179}
180
181impl SerTo<String> for Map<String, Value> {
182 fn ser_to(self) -> Result<String> {
183 Value::Object(self).ser_to()
184 }
185}
186
187impl SerTo<String> for Value {
188 fn ser_to(self) -> Result<String> {
189 Ok(self.to_string())
190 }
191}
192
193fn datum_to_json_object(
194 field: &Field,
195 datum: DatumRef<'_>,
196 config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198 let scalar_ref = match datum {
199 None => {
200 return Ok(Value::Null);
201 }
202 Some(datum) => datum,
203 };
204
205 let data_type = field.data_type();
206
207 tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209 let value = match (data_type, scalar_ref) {
210 (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211 json!(v)
212 }
213 (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214 json!(v)
215 }
216 (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217 json!(v)
218 }
219 (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220 json!(v)
221 }
222 (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223 json!(format!("{:#018x}", v.into_inner()))
225 }
226 (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227 json!(f32::from(v))
228 }
229 (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230 json!(f64::from(v))
231 }
232 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233 json!(v)
234 }
235 (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237 CustomJsonType::Doris(map) => {
238 let s = map.get(&field.name).unwrap();
239 v.rescale(*s as u32);
240 json!(v.to_text())
241 }
242 CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243 json!(v.to_text())
244 }
245 },
246 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247 match config.timestamptz_handling_mode {
248 TimestamptzHandlingMode::UtcString => {
249 let parsed = v.to_datetime_utc();
250 let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251 json!(v)
252 }
253 TimestamptzHandlingMode::UtcWithoutSuffix => {
254 let parsed = v.to_datetime_utc().naive_utc();
255 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256 json!(v)
257 }
258 TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259 TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260 }
261 }
262 (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263 TimeHandlingMode::Milli => {
264 json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266 }
267 TimeHandlingMode::String => {
268 let a = v.0.format("%H:%M:%S%.6f").to_string();
269 json!(a)
270 }
271 },
272 (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273 DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274 DateHandlingMode::FromEpoch => {
275 let duration = v.0 - NaiveDateTime::UNIX_EPOCH.date();
276 json!(duration.num_days())
277 }
278 DateHandlingMode::String => {
279 let a = v.0.format("%Y-%m-%d").to_string();
280 json!(a)
281 }
282 },
283 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284 match config.timestamp_handling_mode {
285 TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286 TimestampHandlingMode::String => {
287 json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288 }
289 }
290 }
291 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292 json!(general_purpose::STANDARD.encode(v))
293 }
294 (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296 json!(v.as_iso_8601())
297 }
298
299 (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300 JsonbHandlingMode::String => {
301 json!(jsonb_ref.to_string())
302 }
303 JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304 },
305 (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306 let elems = list_ref.iter();
307 let mut vec = Vec::with_capacity(elems.len());
308 let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309 for sub_datum_ref in elems {
310 let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311 vec.push(value);
312 }
313 json!(vec)
314 }
315 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
316 match config.custom_json_type {
317 CustomJsonType::Doris(_) => {
318 let mut map = IndexMap::with_capacity(st.len());
320 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
321 st.iter()
322 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
323 ) {
324 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
325 map.insert(sub_field.name.clone(), value);
326 }
327 Value::String(
328 serde_json::to_string(&map).context("failed to serialize into JSON")?,
329 )
330 }
331 CustomJsonType::StarRocks => {
332 return Err(ArrayError::internal(
333 "starrocks can't support struct".to_owned(),
334 ));
335 }
336 CustomJsonType::Es | CustomJsonType::None => {
337 let mut map = Map::with_capacity(st.len());
338 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
339 st.iter()
340 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
341 ) {
342 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
343 map.insert(sub_field.name.clone(), value);
344 }
345 json!(map)
346 }
347 }
348 }
349 (data_type, scalar_ref) => {
351 return Err(ArrayError::internal(format!(
352 "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
353 field.name, data_type, scalar_ref
354 )));
355 }
356 };
357
358 Ok(value)
359}
360
361fn json_converter_with_schema<'a>(
362 object: Value,
363 name: String,
364 fields: impl Iterator<Item = &'a Field>,
365) -> Map<String, Value> {
366 let mut mapping = Map::with_capacity(2);
367 mapping.insert(
368 "schema".to_owned(),
369 json!({
370 "type": "struct",
371 "fields": fields.map(|field| {
372 let mut mapping = type_as_json_schema(&field.data_type);
373 mapping.insert("field".to_owned(), json!(field.name));
374 mapping
375 }).collect_vec(),
376 "optional": false,
377 "name": name,
378 }),
379 );
380 mapping.insert("payload".to_owned(), object);
381 mapping
382}
383
384pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
386 match rw_type {
387 DataType::Boolean => "boolean",
388 DataType::Int16 => "int16",
389 DataType::Int32 => "int32",
390 DataType::Int64 => "int64",
391 DataType::Float32 => "float",
392 DataType::Float64 => "double",
393 DataType::Decimal => "string",
394 DataType::Date => "int32",
395 DataType::Varchar => "string",
396 DataType::Time => "int64",
397 DataType::Timestamp => "int64",
398 DataType::Timestamptz => "string",
399 DataType::Interval => "string",
400 DataType::Struct(_) => "struct",
401 DataType::List(_) => "array",
402 DataType::Bytea => "bytes",
403 DataType::Jsonb => "string",
404 DataType::Serial => "string",
405 DataType::Int256 => "string",
406 DataType::Map(_) => "map",
407 }
408}
409
410fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
411 let mut mapping = Map::with_capacity(4); mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
413 mapping.insert("optional".to_owned(), json!(true));
414 match rw_type {
415 DataType::Struct(struct_type) => {
416 let sub_fields = struct_type
417 .iter()
418 .map(|(sub_name, sub_type)| {
419 let mut sub_mapping = type_as_json_schema(sub_type);
420 sub_mapping.insert("field".to_owned(), json!(sub_name));
421 sub_mapping
422 })
423 .collect_vec();
424 mapping.insert("fields".to_owned(), json!(sub_fields));
425 }
426 DataType::List(sub_type) => {
427 mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
428 }
429 _ => {}
430 }
431
432 mapping
433}
434
435#[cfg(test)]
436mod tests {
437 use risingwave_common::types::{
438 Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
439 Timestamp,
440 };
441
442 use super::*;
443
444 #[test]
445 fn test_to_json_basic_type() {
446 let mock_field = Field {
447 data_type: DataType::Boolean,
448 name: Default::default(),
449 };
450
451 let config = JsonEncoderConfig {
452 time_handling_mode: TimeHandlingMode::Milli,
453 date_handling_mode: DateHandlingMode::FromCe,
454 timestamp_handling_mode: TimestampHandlingMode::String,
455 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
456 custom_json_type: CustomJsonType::None,
457 jsonb_handling_mode: JsonbHandlingMode::String,
458 };
459
460 let boolean_value = datum_to_json_object(
461 &Field {
462 data_type: DataType::Boolean,
463 ..mock_field.clone()
464 },
465 Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
466 &config,
467 )
468 .unwrap();
469 assert_eq!(boolean_value, json!(false));
470
471 let int16_value = datum_to_json_object(
472 &Field {
473 data_type: DataType::Int16,
474 ..mock_field.clone()
475 },
476 Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
477 &config,
478 )
479 .unwrap();
480 assert_eq!(int16_value, json!(16));
481
482 let int64_value = datum_to_json_object(
483 &Field {
484 data_type: DataType::Int64,
485 ..mock_field.clone()
486 },
487 Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
488 &config,
489 )
490 .unwrap();
491 assert_eq!(
492 serde_json::to_string(&int64_value).unwrap(),
493 i64::MAX.to_string()
494 );
495
496 let serial_value = datum_to_json_object(
497 &Field {
498 data_type: DataType::Serial,
499 ..mock_field.clone()
500 },
501 Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
502 &config,
503 )
504 .unwrap();
505 assert_eq!(
506 serde_json::to_string(&serial_value).unwrap(),
507 format!("\"{:#018x}\"", i64::MAX)
508 );
509
510 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
512 let tstz_value = datum_to_json_object(
513 &Field {
514 data_type: DataType::Timestamptz,
515 ..mock_field.clone()
516 },
517 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
518 &config,
519 )
520 .unwrap();
521 assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
522
523 let unix_wo_suffix_config = JsonEncoderConfig {
524 time_handling_mode: TimeHandlingMode::Milli,
525 date_handling_mode: DateHandlingMode::FromCe,
526 timestamp_handling_mode: TimestampHandlingMode::String,
527 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
528 custom_json_type: CustomJsonType::None,
529 jsonb_handling_mode: JsonbHandlingMode::String,
530 };
531
532 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
533 let tstz_value = datum_to_json_object(
534 &Field {
535 data_type: DataType::Timestamptz,
536 ..mock_field.clone()
537 },
538 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
539 &unix_wo_suffix_config,
540 )
541 .unwrap();
542 assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
543
544 let timestamp_milli_config = JsonEncoderConfig {
545 time_handling_mode: TimeHandlingMode::String,
546 date_handling_mode: DateHandlingMode::FromCe,
547 timestamp_handling_mode: TimestampHandlingMode::Milli,
548 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
549 custom_json_type: CustomJsonType::None,
550 jsonb_handling_mode: JsonbHandlingMode::String,
551 };
552 let ts_value = datum_to_json_object(
553 &Field {
554 data_type: DataType::Timestamp,
555 ..mock_field.clone()
556 },
557 Some(
558 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
559 .as_scalar_ref_impl(),
560 ),
561 ×tamp_milli_config,
562 )
563 .unwrap();
564 assert_eq!(ts_value, json!(1000 * 1000));
565
566 let ts_value = datum_to_json_object(
567 &Field {
568 data_type: DataType::Timestamp,
569 ..mock_field.clone()
570 },
571 Some(
572 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
573 .as_scalar_ref_impl(),
574 ),
575 &config,
576 )
577 .unwrap();
578 assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
579
580 let time_value = datum_to_json_object(
582 &Field {
583 data_type: DataType::Time,
584 ..mock_field.clone()
585 },
586 Some(
587 ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
588 .as_scalar_ref_impl(),
589 ),
590 &config,
591 )
592 .unwrap();
593 assert_eq!(time_value, json!(1000 * 1000));
594
595 let interval_value = datum_to_json_object(
596 &Field {
597 data_type: DataType::Interval,
598 ..mock_field.clone()
599 },
600 Some(
601 ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
602 .as_scalar_ref_impl(),
603 ),
604 &config,
605 )
606 .unwrap();
607 assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
608
609 let mut map = HashMap::default();
610 map.insert("aaa".to_owned(), 5_u8);
611 let doris_config = JsonEncoderConfig {
612 time_handling_mode: TimeHandlingMode::String,
613 date_handling_mode: DateHandlingMode::String,
614 timestamp_handling_mode: TimestampHandlingMode::String,
615 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
616 custom_json_type: CustomJsonType::Doris(map),
617 jsonb_handling_mode: JsonbHandlingMode::String,
618 };
619 let decimal = datum_to_json_object(
620 &Field {
621 data_type: DataType::Decimal,
622 name: "aaa".to_owned(),
623 },
624 Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
625 &doris_config,
626 )
627 .unwrap();
628 assert_eq!(decimal, json!("1.11111"));
629
630 let date_value = datum_to_json_object(
631 &Field {
632 data_type: DataType::Date,
633 ..mock_field.clone()
634 },
635 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
636 &config,
637 )
638 .unwrap();
639 assert_eq!(date_value, json!(719163));
640
641 let from_epoch_config = JsonEncoderConfig {
642 time_handling_mode: TimeHandlingMode::String,
643 date_handling_mode: DateHandlingMode::FromEpoch,
644 timestamp_handling_mode: TimestampHandlingMode::String,
645 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
646 custom_json_type: CustomJsonType::None,
647 jsonb_handling_mode: JsonbHandlingMode::String,
648 };
649 let date_value = datum_to_json_object(
650 &Field {
651 data_type: DataType::Date,
652 ..mock_field.clone()
653 },
654 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
655 &from_epoch_config,
656 )
657 .unwrap();
658 assert_eq!(date_value, json!(0));
659
660 let doris_config = JsonEncoderConfig {
661 time_handling_mode: TimeHandlingMode::String,
662 date_handling_mode: DateHandlingMode::String,
663 timestamp_handling_mode: TimestampHandlingMode::String,
664 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
665 custom_json_type: CustomJsonType::Doris(HashMap::default()),
666 jsonb_handling_mode: JsonbHandlingMode::String,
667 };
668 let date_value = datum_to_json_object(
669 &Field {
670 data_type: DataType::Date,
671 ..mock_field.clone()
672 },
673 Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
674 &doris_config,
675 )
676 .unwrap();
677 assert_eq!(date_value, json!("2010-10-10"));
678
679 let value = StructValue::new(vec![
680 Some(3_i32.to_scalar_value()),
681 Some(2_i32.to_scalar_value()),
682 Some(1_i32.to_scalar_value()),
683 ]);
684
685 let interval_value = datum_to_json_object(
686 &Field {
687 data_type: DataType::Struct(StructType::new(vec![
688 ("v3", DataType::Int32),
689 ("v2", DataType::Int32),
690 ("v1", DataType::Int32),
691 ])),
692 ..mock_field.clone()
693 },
694 Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
695 &doris_config,
696 )
697 .unwrap();
698 assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
699
700 let encode_jsonb_obj_config = JsonEncoderConfig {
701 time_handling_mode: TimeHandlingMode::String,
702 date_handling_mode: DateHandlingMode::String,
703 timestamp_handling_mode: TimestampHandlingMode::String,
704 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
705 custom_json_type: CustomJsonType::None,
706 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
707 };
708 let json_value = datum_to_json_object(
709 &Field {
710 data_type: DataType::Jsonb,
711 ..mock_field.clone()
712 },
713 Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
714 &encode_jsonb_obj_config,
715 )
716 .unwrap();
717 assert_eq!(json_value, json!([1, 2, 3]));
718 }
719
720 #[test]
721 fn test_generate_json_converter_schema() {
722 let fields = vec![
723 Field {
724 data_type: DataType::Boolean,
725 name: "v1".into(),
726 },
727 Field {
728 data_type: DataType::Int16,
729 name: "v2".into(),
730 },
731 Field {
732 data_type: DataType::Int32,
733 name: "v3".into(),
734 },
735 Field {
736 data_type: DataType::Float32,
737 name: "v4".into(),
738 },
739 Field {
740 data_type: DataType::Decimal,
741 name: "v5".into(),
742 },
743 Field {
744 data_type: DataType::Date,
745 name: "v6".into(),
746 },
747 Field {
748 data_type: DataType::Varchar,
749 name: "v7".into(),
750 },
751 Field {
752 data_type: DataType::Time,
753 name: "v8".into(),
754 },
755 Field {
756 data_type: DataType::Interval,
757 name: "v9".into(),
758 },
759 Field {
760 data_type: DataType::Struct(StructType::new(vec![
761 ("a", DataType::Timestamp),
762 ("b", DataType::Timestamptz),
763 (
764 "c",
765 DataType::Struct(StructType::new(vec![
766 ("aa", DataType::Int64),
767 ("bb", DataType::Float64),
768 ])),
769 ),
770 ])),
771 name: "v10".into(),
772 },
773 Field {
774 data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
775 StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
776 ))))),
777 name: "v11".into(),
778 },
779 Field {
780 data_type: DataType::Jsonb,
781 name: "12".into(),
782 },
783 Field {
784 data_type: DataType::Serial,
785 name: "13".into(),
786 },
787 Field {
788 data_type: DataType::Int256,
789 name: "14".into(),
790 },
791 ];
792 let schema =
793 json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
794 .to_string();
795 let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
796 assert_eq!(schema, ans);
797 }
798}