1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33 CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34 Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39 time_handling_mode: TimeHandlingMode,
40 date_handling_mode: DateHandlingMode,
41 timestamp_handling_mode: TimestampHandlingMode,
42 timestamptz_handling_mode: TimestamptzHandlingMode,
43 custom_json_type: CustomJsonType,
44 jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48 schema: Schema,
49 col_indices: Option<Vec<usize>>,
50 kafka_connect: Option<KafkaConnectParamsRef>,
51 config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55 pub fn new(
56 schema: Schema,
57 col_indices: Option<Vec<usize>>,
58 date_handling_mode: DateHandlingMode,
59 timestamp_handling_mode: TimestampHandlingMode,
60 timestamptz_handling_mode: TimestamptzHandlingMode,
61 time_handling_mode: TimeHandlingMode,
62 jsonb_handling_mode: JsonbHandlingMode,
63 ) -> Self {
64 let config = JsonEncoderConfig {
65 time_handling_mode,
66 date_handling_mode,
67 timestamp_handling_mode,
68 timestamptz_handling_mode,
69 custom_json_type: CustomJsonType::None,
70 jsonb_handling_mode,
71 };
72 Self {
73 schema,
74 col_indices,
75 kafka_connect: None,
76 config,
77 }
78 }
79
80 pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81 let config = JsonEncoderConfig {
82 time_handling_mode: TimeHandlingMode::String,
83 date_handling_mode: DateHandlingMode::String,
84 timestamp_handling_mode: TimestampHandlingMode::String,
85 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86 custom_json_type: CustomJsonType::Es,
87 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88 };
89 Self {
90 schema,
91 col_indices,
92 kafka_connect: None,
93 config,
94 }
95 }
96
97 pub fn new_with_doris(
98 schema: Schema,
99 col_indices: Option<Vec<usize>>,
100 map: HashMap<String, u8>,
101 ) -> Self {
102 let config = JsonEncoderConfig {
103 time_handling_mode: TimeHandlingMode::Milli,
104 date_handling_mode: DateHandlingMode::String,
105 timestamp_handling_mode: TimestampHandlingMode::String,
106 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107 custom_json_type: CustomJsonType::Doris(map),
108 jsonb_handling_mode: JsonbHandlingMode::String,
109 };
110 Self {
111 schema,
112 col_indices,
113 kafka_connect: None,
114 config,
115 }
116 }
117
118 pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119 let config = JsonEncoderConfig {
120 time_handling_mode: TimeHandlingMode::Milli,
121 date_handling_mode: DateHandlingMode::String,
122 timestamp_handling_mode: TimestampHandlingMode::String,
123 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124 custom_json_type: CustomJsonType::StarRocks,
125 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126 };
127 Self {
128 schema,
129 col_indices,
130 kafka_connect: None,
131 config,
132 }
133 }
134
135 pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136 Self {
137 kafka_connect: Some(Arc::new(kafka_connect)),
138 ..self
139 }
140 }
141}
142
143impl RowEncoder for JsonEncoder {
144 type Output = Map<String, Value>;
145
146 fn schema(&self) -> &Schema {
147 &self.schema
148 }
149
150 fn col_indices(&self) -> Option<&[usize]> {
151 self.col_indices.as_ref().map(Vec::as_ref)
152 }
153
154 fn encode_cols(
155 &self,
156 row: impl Row,
157 col_indices: impl Iterator<Item = usize>,
158 ) -> Result<Self::Output> {
159 let mut mappings = Map::with_capacity(self.schema.len());
160 let col_indices = col_indices.collect_vec();
161 for idx in &col_indices {
162 let field = &self.schema[*idx];
163 let key = field.name.clone();
164 let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165 .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166 mappings.insert(key, value);
167 }
168
169 Ok(if let Some(param) = &self.kafka_connect {
170 json_converter_with_schema(
171 Value::Object(mappings),
172 param.schema_name.to_owned(),
173 col_indices.into_iter().map(|i| &self.schema[i]),
174 )
175 } else {
176 mappings
177 })
178 }
179}
180
181impl SerTo<String> for Map<String, Value> {
182 fn ser_to(self) -> Result<String> {
183 Value::Object(self).ser_to()
184 }
185}
186
187impl SerTo<String> for Value {
188 fn ser_to(self) -> Result<String> {
189 Ok(self.to_string())
190 }
191}
192
193fn datum_to_json_object(
194 field: &Field,
195 datum: DatumRef<'_>,
196 config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198 let scalar_ref = match datum {
199 None => {
200 return Ok(Value::Null);
201 }
202 Some(datum) => datum,
203 };
204
205 let data_type = field.data_type();
206
207 tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209 let value = match (data_type, scalar_ref) {
210 (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211 json!(v)
212 }
213 (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214 json!(v)
215 }
216 (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217 json!(v)
218 }
219 (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220 json!(v)
221 }
222 (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223 json!(format!("{:#018x}", v.into_inner()))
225 }
226 (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227 json!(f32::from(v))
228 }
229 (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230 json!(f64::from(v))
231 }
232 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233 json!(v)
234 }
235 (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237 CustomJsonType::Doris(map) => {
238 let s = map.get(&field.name).unwrap();
239 v.rescale(*s as u32);
240 json!(v.to_text())
241 }
242 CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243 json!(v.to_text())
244 }
245 },
246 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247 match config.timestamptz_handling_mode {
248 TimestamptzHandlingMode::UtcString => {
249 let parsed = v.to_datetime_utc();
250 let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251 json!(v)
252 }
253 TimestamptzHandlingMode::UtcWithoutSuffix => {
254 let parsed = v.to_datetime_utc().naive_utc();
255 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256 json!(v)
257 }
258 TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259 TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260 }
261 }
262 (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263 TimeHandlingMode::Milli => {
264 json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266 }
267 TimeHandlingMode::String => {
268 let a = v.0.format("%H:%M:%S%.6f").to_string();
269 json!(a)
270 }
271 },
272 (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273 DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274 DateHandlingMode::FromEpoch => {
275 let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
276 json!(duration.num_days())
277 }
278 DateHandlingMode::String => {
279 let a = v.0.format("%Y-%m-%d").to_string();
280 json!(a)
281 }
282 },
283 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284 match config.timestamp_handling_mode {
285 TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286 TimestampHandlingMode::String => {
287 json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288 }
289 }
290 }
291 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292 json!(general_purpose::STANDARD.encode(v))
293 }
294 (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296 json!(v.as_iso_8601())
297 }
298
299 (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300 JsonbHandlingMode::String => {
301 json!(jsonb_ref.to_string())
302 }
303 JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304 },
305 (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306 let elems = list_ref.iter();
307 let mut vec = Vec::with_capacity(elems.len());
308 let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309 for sub_datum_ref in elems {
310 let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311 vec.push(value);
312 }
313 json!(vec)
314 }
315 (DataType::Vector(_), ScalarRefImpl::Vector(vector)) => {
316 let elems = vector.as_raw_slice();
317 let mut vec = Vec::with_capacity(elems.len());
318 for v in elems {
319 let value = serde_json::Number::from_f64(*v as _)
320 .map(Value::Number)
321 .unwrap_or(Value::Null);
322 vec.push(value);
323 }
324 json!(vec)
325 }
326 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
327 match config.custom_json_type {
328 CustomJsonType::Doris(_) => {
329 let mut map = IndexMap::with_capacity(st.len());
331 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
332 st.iter()
333 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
334 ) {
335 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
336 map.insert(sub_field.name.clone(), value);
337 }
338 Value::String(
339 serde_json::to_string(&map).context("failed to serialize into JSON")?,
340 )
341 }
342 CustomJsonType::StarRocks => {
343 return Err(ArrayError::internal(
344 "starrocks can't support struct".to_owned(),
345 ));
346 }
347 CustomJsonType::Es | CustomJsonType::None => {
348 let mut map = Map::with_capacity(st.len());
349 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
350 st.iter()
351 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
352 ) {
353 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
354 map.insert(sub_field.name.clone(), value);
355 }
356 json!(map)
357 }
358 }
359 }
360 (data_type, scalar_ref) => {
362 return Err(ArrayError::internal(format!(
363 "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
364 field.name, data_type, scalar_ref
365 )));
366 }
367 };
368
369 Ok(value)
370}
371
372fn json_converter_with_schema<'a>(
373 object: Value,
374 name: String,
375 fields: impl Iterator<Item = &'a Field>,
376) -> Map<String, Value> {
377 let mut mapping = Map::with_capacity(2);
378 mapping.insert(
379 "schema".to_owned(),
380 json!({
381 "type": "struct",
382 "fields": fields.map(|field| {
383 let mut mapping = type_as_json_schema(&field.data_type);
384 mapping.insert("field".to_owned(), json!(field.name));
385 mapping
386 }).collect_vec(),
387 "optional": false,
388 "name": name,
389 }),
390 );
391 mapping.insert("payload".to_owned(), object);
392 mapping
393}
394
395pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
397 match rw_type {
398 DataType::Boolean => "boolean",
399 DataType::Int16 => "int16",
400 DataType::Int32 => "int32",
401 DataType::Int64 => "int64",
402 DataType::Float32 => "float",
403 DataType::Float64 => "double",
404 DataType::Decimal => "string",
405 DataType::Date => "int32",
406 DataType::Varchar => "string",
407 DataType::Time => "int64",
408 DataType::Timestamp => "int64",
409 DataType::Timestamptz => "string",
410 DataType::Interval => "string",
411 DataType::Struct(_) => "struct",
412 DataType::List(_) => "array",
413 DataType::Vector(_) => "array",
414 DataType::Bytea => "bytes",
415 DataType::Jsonb => "string",
416 DataType::Serial => "string",
417 DataType::Int256 => "string",
418 DataType::Map(_) => "map",
419 }
420}
421
422fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
423 let mut mapping = Map::with_capacity(4); mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
425 mapping.insert("optional".to_owned(), json!(true));
426 match rw_type {
427 DataType::Struct(struct_type) => {
428 let sub_fields = struct_type
429 .iter()
430 .map(|(sub_name, sub_type)| {
431 let mut sub_mapping = type_as_json_schema(sub_type);
432 sub_mapping.insert("field".to_owned(), json!(sub_name));
433 sub_mapping
434 })
435 .collect_vec();
436 mapping.insert("fields".to_owned(), json!(sub_fields));
437 }
438 DataType::List(sub_type) => {
439 mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
440 }
441 _ => {}
442 }
443
444 mapping
445}
446
447#[cfg(test)]
448mod tests {
449 use risingwave_common::types::{
450 Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
451 Timestamp,
452 };
453
454 use super::*;
455
456 #[test]
457 fn test_to_json_basic_type() {
458 let mock_field = Field {
459 data_type: DataType::Boolean,
460 name: Default::default(),
461 };
462
463 let config = JsonEncoderConfig {
464 time_handling_mode: TimeHandlingMode::Milli,
465 date_handling_mode: DateHandlingMode::FromCe,
466 timestamp_handling_mode: TimestampHandlingMode::String,
467 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
468 custom_json_type: CustomJsonType::None,
469 jsonb_handling_mode: JsonbHandlingMode::String,
470 };
471
472 let boolean_value = datum_to_json_object(
473 &Field {
474 data_type: DataType::Boolean,
475 ..mock_field.clone()
476 },
477 Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
478 &config,
479 )
480 .unwrap();
481 assert_eq!(boolean_value, json!(false));
482
483 let int16_value = datum_to_json_object(
484 &Field {
485 data_type: DataType::Int16,
486 ..mock_field.clone()
487 },
488 Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
489 &config,
490 )
491 .unwrap();
492 assert_eq!(int16_value, json!(16));
493
494 let int64_value = datum_to_json_object(
495 &Field {
496 data_type: DataType::Int64,
497 ..mock_field.clone()
498 },
499 Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
500 &config,
501 )
502 .unwrap();
503 assert_eq!(
504 serde_json::to_string(&int64_value).unwrap(),
505 i64::MAX.to_string()
506 );
507
508 let serial_value = datum_to_json_object(
509 &Field {
510 data_type: DataType::Serial,
511 ..mock_field.clone()
512 },
513 Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
514 &config,
515 )
516 .unwrap();
517 assert_eq!(
518 serde_json::to_string(&serial_value).unwrap(),
519 format!("\"{:#018x}\"", i64::MAX)
520 );
521
522 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
524 let tstz_value = datum_to_json_object(
525 &Field {
526 data_type: DataType::Timestamptz,
527 ..mock_field.clone()
528 },
529 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
530 &config,
531 )
532 .unwrap();
533 assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
534
535 let unix_wo_suffix_config = JsonEncoderConfig {
536 time_handling_mode: TimeHandlingMode::Milli,
537 date_handling_mode: DateHandlingMode::FromCe,
538 timestamp_handling_mode: TimestampHandlingMode::String,
539 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
540 custom_json_type: CustomJsonType::None,
541 jsonb_handling_mode: JsonbHandlingMode::String,
542 };
543
544 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
545 let tstz_value = datum_to_json_object(
546 &Field {
547 data_type: DataType::Timestamptz,
548 ..mock_field.clone()
549 },
550 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
551 &unix_wo_suffix_config,
552 )
553 .unwrap();
554 assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
555
556 let timestamp_milli_config = JsonEncoderConfig {
557 time_handling_mode: TimeHandlingMode::String,
558 date_handling_mode: DateHandlingMode::FromCe,
559 timestamp_handling_mode: TimestampHandlingMode::Milli,
560 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
561 custom_json_type: CustomJsonType::None,
562 jsonb_handling_mode: JsonbHandlingMode::String,
563 };
564 let ts_value = datum_to_json_object(
565 &Field {
566 data_type: DataType::Timestamp,
567 ..mock_field.clone()
568 },
569 Some(
570 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
571 .as_scalar_ref_impl(),
572 ),
573 ×tamp_milli_config,
574 )
575 .unwrap();
576 assert_eq!(ts_value, json!(1000 * 1000));
577
578 let ts_value = datum_to_json_object(
579 &Field {
580 data_type: DataType::Timestamp,
581 ..mock_field.clone()
582 },
583 Some(
584 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
585 .as_scalar_ref_impl(),
586 ),
587 &config,
588 )
589 .unwrap();
590 assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
591
592 let time_value = datum_to_json_object(
594 &Field {
595 data_type: DataType::Time,
596 ..mock_field.clone()
597 },
598 Some(
599 ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
600 .as_scalar_ref_impl(),
601 ),
602 &config,
603 )
604 .unwrap();
605 assert_eq!(time_value, json!(1000 * 1000));
606
607 let interval_value = datum_to_json_object(
608 &Field {
609 data_type: DataType::Interval,
610 ..mock_field.clone()
611 },
612 Some(
613 ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
614 .as_scalar_ref_impl(),
615 ),
616 &config,
617 )
618 .unwrap();
619 assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
620
621 let mut map = HashMap::default();
622 map.insert("aaa".to_owned(), 5_u8);
623 let doris_config = JsonEncoderConfig {
624 time_handling_mode: TimeHandlingMode::String,
625 date_handling_mode: DateHandlingMode::String,
626 timestamp_handling_mode: TimestampHandlingMode::String,
627 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
628 custom_json_type: CustomJsonType::Doris(map),
629 jsonb_handling_mode: JsonbHandlingMode::String,
630 };
631 let decimal = datum_to_json_object(
632 &Field {
633 data_type: DataType::Decimal,
634 name: "aaa".to_owned(),
635 },
636 Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
637 &doris_config,
638 )
639 .unwrap();
640 assert_eq!(decimal, json!("1.11111"));
641
642 let date_value = datum_to_json_object(
643 &Field {
644 data_type: DataType::Date,
645 ..mock_field.clone()
646 },
647 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
648 &config,
649 )
650 .unwrap();
651 assert_eq!(date_value, json!(719163));
652
653 let from_epoch_config = JsonEncoderConfig {
654 time_handling_mode: TimeHandlingMode::String,
655 date_handling_mode: DateHandlingMode::FromEpoch,
656 timestamp_handling_mode: TimestampHandlingMode::String,
657 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
658 custom_json_type: CustomJsonType::None,
659 jsonb_handling_mode: JsonbHandlingMode::String,
660 };
661 let date_value = datum_to_json_object(
662 &Field {
663 data_type: DataType::Date,
664 ..mock_field.clone()
665 },
666 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
667 &from_epoch_config,
668 )
669 .unwrap();
670 assert_eq!(date_value, json!(0));
671
672 let doris_config = JsonEncoderConfig {
673 time_handling_mode: TimeHandlingMode::String,
674 date_handling_mode: DateHandlingMode::String,
675 timestamp_handling_mode: TimestampHandlingMode::String,
676 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
677 custom_json_type: CustomJsonType::Doris(HashMap::default()),
678 jsonb_handling_mode: JsonbHandlingMode::String,
679 };
680 let date_value = datum_to_json_object(
681 &Field {
682 data_type: DataType::Date,
683 ..mock_field.clone()
684 },
685 Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
686 &doris_config,
687 )
688 .unwrap();
689 assert_eq!(date_value, json!("2010-10-10"));
690
691 let value = StructValue::new(vec![
692 Some(3_i32.to_scalar_value()),
693 Some(2_i32.to_scalar_value()),
694 Some(1_i32.to_scalar_value()),
695 ]);
696
697 let interval_value = datum_to_json_object(
698 &Field {
699 data_type: DataType::Struct(StructType::new(vec![
700 ("v3", DataType::Int32),
701 ("v2", DataType::Int32),
702 ("v1", DataType::Int32),
703 ])),
704 ..mock_field.clone()
705 },
706 Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
707 &doris_config,
708 )
709 .unwrap();
710 assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
711
712 let encode_jsonb_obj_config = JsonEncoderConfig {
713 time_handling_mode: TimeHandlingMode::String,
714 date_handling_mode: DateHandlingMode::String,
715 timestamp_handling_mode: TimestampHandlingMode::String,
716 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
717 custom_json_type: CustomJsonType::None,
718 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
719 };
720 let json_value = datum_to_json_object(
721 &Field {
722 data_type: DataType::Jsonb,
723 ..mock_field.clone()
724 },
725 Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
726 &encode_jsonb_obj_config,
727 )
728 .unwrap();
729 assert_eq!(json_value, json!([1, 2, 3]));
730 }
731
732 #[test]
733 fn test_generate_json_converter_schema() {
734 let fields = vec![
735 Field {
736 data_type: DataType::Boolean,
737 name: "v1".into(),
738 },
739 Field {
740 data_type: DataType::Int16,
741 name: "v2".into(),
742 },
743 Field {
744 data_type: DataType::Int32,
745 name: "v3".into(),
746 },
747 Field {
748 data_type: DataType::Float32,
749 name: "v4".into(),
750 },
751 Field {
752 data_type: DataType::Decimal,
753 name: "v5".into(),
754 },
755 Field {
756 data_type: DataType::Date,
757 name: "v6".into(),
758 },
759 Field {
760 data_type: DataType::Varchar,
761 name: "v7".into(),
762 },
763 Field {
764 data_type: DataType::Time,
765 name: "v8".into(),
766 },
767 Field {
768 data_type: DataType::Interval,
769 name: "v9".into(),
770 },
771 Field {
772 data_type: DataType::Struct(StructType::new(vec![
773 ("a", DataType::Timestamp),
774 ("b", DataType::Timestamptz),
775 (
776 "c",
777 DataType::Struct(StructType::new(vec![
778 ("aa", DataType::Int64),
779 ("bb", DataType::Float64),
780 ])),
781 ),
782 ])),
783 name: "v10".into(),
784 },
785 Field {
786 data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
787 StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
788 ))))),
789 name: "v11".into(),
790 },
791 Field {
792 data_type: DataType::Jsonb,
793 name: "12".into(),
794 },
795 Field {
796 data_type: DataType::Serial,
797 name: "13".into(),
798 },
799 Field {
800 data_type: DataType::Int256,
801 name: "14".into(),
802 },
803 ];
804 let schema =
805 json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
806 .to_string();
807 let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
808 assert_eq!(schema, ans);
809 }
810}