1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33 CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34 Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39 time_handling_mode: TimeHandlingMode,
40 date_handling_mode: DateHandlingMode,
41 timestamp_handling_mode: TimestampHandlingMode,
42 timestamptz_handling_mode: TimestamptzHandlingMode,
43 custom_json_type: CustomJsonType,
44 jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48 schema: Schema,
49 col_indices: Option<Vec<usize>>,
50 kafka_connect: Option<KafkaConnectParamsRef>,
51 config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55 pub fn new(
56 schema: Schema,
57 col_indices: Option<Vec<usize>>,
58 date_handling_mode: DateHandlingMode,
59 timestamp_handling_mode: TimestampHandlingMode,
60 timestamptz_handling_mode: TimestamptzHandlingMode,
61 time_handling_mode: TimeHandlingMode,
62 jsonb_handling_mode: JsonbHandlingMode,
63 ) -> Self {
64 let config = JsonEncoderConfig {
65 time_handling_mode,
66 date_handling_mode,
67 timestamp_handling_mode,
68 timestamptz_handling_mode,
69 custom_json_type: CustomJsonType::None,
70 jsonb_handling_mode,
71 };
72 Self {
73 schema,
74 col_indices,
75 kafka_connect: None,
76 config,
77 }
78 }
79
80 pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81 let config = JsonEncoderConfig {
82 time_handling_mode: TimeHandlingMode::String,
83 date_handling_mode: DateHandlingMode::String,
84 timestamp_handling_mode: TimestampHandlingMode::String,
85 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86 custom_json_type: CustomJsonType::Es,
87 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88 };
89 Self {
90 schema,
91 col_indices,
92 kafka_connect: None,
93 config,
94 }
95 }
96
97 pub fn new_with_doris(
98 schema: Schema,
99 col_indices: Option<Vec<usize>>,
100 map: HashMap<String, u8>,
101 ) -> Self {
102 let config = JsonEncoderConfig {
103 time_handling_mode: TimeHandlingMode::Milli,
104 date_handling_mode: DateHandlingMode::String,
105 timestamp_handling_mode: TimestampHandlingMode::String,
106 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107 custom_json_type: CustomJsonType::Doris(map),
108 jsonb_handling_mode: JsonbHandlingMode::String,
109 };
110 Self {
111 schema,
112 col_indices,
113 kafka_connect: None,
114 config,
115 }
116 }
117
118 pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119 let config = JsonEncoderConfig {
120 time_handling_mode: TimeHandlingMode::Milli,
121 date_handling_mode: DateHandlingMode::String,
122 timestamp_handling_mode: TimestampHandlingMode::String,
123 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124 custom_json_type: CustomJsonType::StarRocks,
125 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126 };
127 Self {
128 schema,
129 col_indices,
130 kafka_connect: None,
131 config,
132 }
133 }
134
135 pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136 Self {
137 kafka_connect: Some(Arc::new(kafka_connect)),
138 ..self
139 }
140 }
141}
142
143impl RowEncoder for JsonEncoder {
144 type Output = Map<String, Value>;
145
146 fn schema(&self) -> &Schema {
147 &self.schema
148 }
149
150 fn col_indices(&self) -> Option<&[usize]> {
151 self.col_indices.as_ref().map(Vec::as_ref)
152 }
153
154 fn encode_cols(
155 &self,
156 row: impl Row,
157 col_indices: impl Iterator<Item = usize>,
158 ) -> Result<Self::Output> {
159 let mut mappings = Map::with_capacity(self.schema.len());
160 let col_indices = col_indices.collect_vec();
161 for idx in &col_indices {
162 let field = &self.schema[*idx];
163 let key = field.name.clone();
164 let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165 .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166 mappings.insert(key, value);
167 }
168
169 Ok(if let Some(param) = &self.kafka_connect {
170 json_converter_with_schema(
171 Value::Object(mappings),
172 param.schema_name.to_owned(),
173 col_indices.into_iter().map(|i| &self.schema[i]),
174 )
175 } else {
176 mappings
177 })
178 }
179}
180
181impl SerTo<String> for Map<String, Value> {
182 fn ser_to(self) -> Result<String> {
183 Value::Object(self).ser_to()
184 }
185}
186
187impl SerTo<String> for Value {
188 fn ser_to(self) -> Result<String> {
189 Ok(self.to_string())
190 }
191}
192
193fn datum_to_json_object(
194 field: &Field,
195 datum: DatumRef<'_>,
196 config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198 let scalar_ref = match datum {
199 None => {
200 return Ok(Value::Null);
201 }
202 Some(datum) => datum,
203 };
204
205 let data_type = field.data_type();
206
207 tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209 let value = match (data_type, scalar_ref) {
210 (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211 json!(v)
212 }
213 (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214 json!(v)
215 }
216 (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217 json!(v)
218 }
219 (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220 json!(v)
221 }
222 (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223 json!(format!("{:#018x}", v.into_inner()))
225 }
226 (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227 json!(f32::from(v))
228 }
229 (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230 json!(f64::from(v))
231 }
232 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233 json!(v)
234 }
235 (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237 CustomJsonType::Doris(map) => {
238 let s = map.get(&field.name).unwrap();
239 v.rescale(*s as u32);
240 json!(v.to_text())
241 }
242 CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243 json!(v.to_text())
244 }
245 },
246 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247 match config.timestamptz_handling_mode {
248 TimestamptzHandlingMode::UtcString => {
249 let parsed = v.to_datetime_utc();
250 let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251 json!(v)
252 }
253 TimestamptzHandlingMode::UtcWithoutSuffix => {
254 let parsed = v.to_datetime_utc().naive_utc();
255 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256 json!(v)
257 }
258 TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259 TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260 }
261 }
262 (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263 TimeHandlingMode::Milli => {
264 json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266 }
267 TimeHandlingMode::String => {
268 let a = v.0.format("%H:%M:%S%.6f").to_string();
269 json!(a)
270 }
271 },
272 (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273 DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274 DateHandlingMode::FromEpoch => {
275 let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
276 json!(duration.num_days())
277 }
278 DateHandlingMode::String => {
279 let a = v.0.format("%Y-%m-%d").to_string();
280 json!(a)
281 }
282 },
283 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284 match config.timestamp_handling_mode {
285 TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286 TimestampHandlingMode::String => {
287 json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288 }
289 }
290 }
291 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292 json!(general_purpose::STANDARD.encode(v))
293 }
294 (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296 json!(v.as_iso_8601())
297 }
298
299 (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300 JsonbHandlingMode::String => {
301 json!(jsonb_ref.to_string())
302 }
303 JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304 },
305 (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306 let elems = list_ref.iter();
307 let mut vec = Vec::with_capacity(elems.len());
308 let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309 for sub_datum_ref in elems {
310 let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311 vec.push(value);
312 }
313 json!(vec)
314 }
315 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
316 match config.custom_json_type {
317 CustomJsonType::Doris(_) => {
318 let mut map = IndexMap::with_capacity(st.len());
320 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
321 st.iter()
322 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
323 ) {
324 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
325 map.insert(sub_field.name.clone(), value);
326 }
327 Value::String(
328 serde_json::to_string(&map).context("failed to serialize into JSON")?,
329 )
330 }
331 CustomJsonType::StarRocks => {
332 return Err(ArrayError::internal(
333 "starrocks can't support struct".to_owned(),
334 ));
335 }
336 CustomJsonType::Es | CustomJsonType::None => {
337 let mut map = Map::with_capacity(st.len());
338 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
339 st.iter()
340 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
341 ) {
342 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
343 map.insert(sub_field.name.clone(), value);
344 }
345 json!(map)
346 }
347 }
348 }
349 (data_type, scalar_ref) => {
351 return Err(ArrayError::internal(format!(
352 "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
353 field.name, data_type, scalar_ref
354 )));
355 }
356 };
357
358 Ok(value)
359}
360
361fn json_converter_with_schema<'a>(
362 object: Value,
363 name: String,
364 fields: impl Iterator<Item = &'a Field>,
365) -> Map<String, Value> {
366 let mut mapping = Map::with_capacity(2);
367 mapping.insert(
368 "schema".to_owned(),
369 json!({
370 "type": "struct",
371 "fields": fields.map(|field| {
372 let mut mapping = type_as_json_schema(&field.data_type);
373 mapping.insert("field".to_owned(), json!(field.name));
374 mapping
375 }).collect_vec(),
376 "optional": false,
377 "name": name,
378 }),
379 );
380 mapping.insert("payload".to_owned(), object);
381 mapping
382}
383
384pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
386 match rw_type {
387 DataType::Boolean => "boolean",
388 DataType::Int16 => "int16",
389 DataType::Int32 => "int32",
390 DataType::Int64 => "int64",
391 DataType::Float32 => "float",
392 DataType::Float64 => "double",
393 DataType::Decimal => "string",
394 DataType::Date => "int32",
395 DataType::Varchar => "string",
396 DataType::Time => "int64",
397 DataType::Timestamp => "int64",
398 DataType::Timestamptz => "string",
399 DataType::Interval => "string",
400 DataType::Struct(_) => "struct",
401 DataType::List(_) => "array",
402 DataType::Bytea => "bytes",
403 DataType::Jsonb => "string",
404 DataType::Serial => "string",
405 DataType::Int256 => "string",
406 DataType::Map(_) => "map",
407 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
408 }
409}
410
411fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
412 let mut mapping = Map::with_capacity(4); mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
414 mapping.insert("optional".to_owned(), json!(true));
415 match rw_type {
416 DataType::Struct(struct_type) => {
417 let sub_fields = struct_type
418 .iter()
419 .map(|(sub_name, sub_type)| {
420 let mut sub_mapping = type_as_json_schema(sub_type);
421 sub_mapping.insert("field".to_owned(), json!(sub_name));
422 sub_mapping
423 })
424 .collect_vec();
425 mapping.insert("fields".to_owned(), json!(sub_fields));
426 }
427 DataType::List(sub_type) => {
428 mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
429 }
430 _ => {}
431 }
432
433 mapping
434}
435
436#[cfg(test)]
437mod tests {
438 use risingwave_common::types::{
439 Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
440 Timestamp,
441 };
442
443 use super::*;
444
445 #[test]
446 fn test_to_json_basic_type() {
447 let mock_field = Field {
448 data_type: DataType::Boolean,
449 name: Default::default(),
450 };
451
452 let config = JsonEncoderConfig {
453 time_handling_mode: TimeHandlingMode::Milli,
454 date_handling_mode: DateHandlingMode::FromCe,
455 timestamp_handling_mode: TimestampHandlingMode::String,
456 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
457 custom_json_type: CustomJsonType::None,
458 jsonb_handling_mode: JsonbHandlingMode::String,
459 };
460
461 let boolean_value = datum_to_json_object(
462 &Field {
463 data_type: DataType::Boolean,
464 ..mock_field.clone()
465 },
466 Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
467 &config,
468 )
469 .unwrap();
470 assert_eq!(boolean_value, json!(false));
471
472 let int16_value = datum_to_json_object(
473 &Field {
474 data_type: DataType::Int16,
475 ..mock_field.clone()
476 },
477 Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
478 &config,
479 )
480 .unwrap();
481 assert_eq!(int16_value, json!(16));
482
483 let int64_value = datum_to_json_object(
484 &Field {
485 data_type: DataType::Int64,
486 ..mock_field.clone()
487 },
488 Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
489 &config,
490 )
491 .unwrap();
492 assert_eq!(
493 serde_json::to_string(&int64_value).unwrap(),
494 i64::MAX.to_string()
495 );
496
497 let serial_value = datum_to_json_object(
498 &Field {
499 data_type: DataType::Serial,
500 ..mock_field.clone()
501 },
502 Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
503 &config,
504 )
505 .unwrap();
506 assert_eq!(
507 serde_json::to_string(&serial_value).unwrap(),
508 format!("\"{:#018x}\"", i64::MAX)
509 );
510
511 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
513 let tstz_value = datum_to_json_object(
514 &Field {
515 data_type: DataType::Timestamptz,
516 ..mock_field.clone()
517 },
518 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
519 &config,
520 )
521 .unwrap();
522 assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
523
524 let unix_wo_suffix_config = JsonEncoderConfig {
525 time_handling_mode: TimeHandlingMode::Milli,
526 date_handling_mode: DateHandlingMode::FromCe,
527 timestamp_handling_mode: TimestampHandlingMode::String,
528 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
529 custom_json_type: CustomJsonType::None,
530 jsonb_handling_mode: JsonbHandlingMode::String,
531 };
532
533 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
534 let tstz_value = datum_to_json_object(
535 &Field {
536 data_type: DataType::Timestamptz,
537 ..mock_field.clone()
538 },
539 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
540 &unix_wo_suffix_config,
541 )
542 .unwrap();
543 assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
544
545 let timestamp_milli_config = JsonEncoderConfig {
546 time_handling_mode: TimeHandlingMode::String,
547 date_handling_mode: DateHandlingMode::FromCe,
548 timestamp_handling_mode: TimestampHandlingMode::Milli,
549 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
550 custom_json_type: CustomJsonType::None,
551 jsonb_handling_mode: JsonbHandlingMode::String,
552 };
553 let ts_value = datum_to_json_object(
554 &Field {
555 data_type: DataType::Timestamp,
556 ..mock_field.clone()
557 },
558 Some(
559 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
560 .as_scalar_ref_impl(),
561 ),
562 ×tamp_milli_config,
563 )
564 .unwrap();
565 assert_eq!(ts_value, json!(1000 * 1000));
566
567 let ts_value = datum_to_json_object(
568 &Field {
569 data_type: DataType::Timestamp,
570 ..mock_field.clone()
571 },
572 Some(
573 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
574 .as_scalar_ref_impl(),
575 ),
576 &config,
577 )
578 .unwrap();
579 assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
580
581 let time_value = datum_to_json_object(
583 &Field {
584 data_type: DataType::Time,
585 ..mock_field.clone()
586 },
587 Some(
588 ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
589 .as_scalar_ref_impl(),
590 ),
591 &config,
592 )
593 .unwrap();
594 assert_eq!(time_value, json!(1000 * 1000));
595
596 let interval_value = datum_to_json_object(
597 &Field {
598 data_type: DataType::Interval,
599 ..mock_field.clone()
600 },
601 Some(
602 ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
603 .as_scalar_ref_impl(),
604 ),
605 &config,
606 )
607 .unwrap();
608 assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
609
610 let mut map = HashMap::default();
611 map.insert("aaa".to_owned(), 5_u8);
612 let doris_config = JsonEncoderConfig {
613 time_handling_mode: TimeHandlingMode::String,
614 date_handling_mode: DateHandlingMode::String,
615 timestamp_handling_mode: TimestampHandlingMode::String,
616 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
617 custom_json_type: CustomJsonType::Doris(map),
618 jsonb_handling_mode: JsonbHandlingMode::String,
619 };
620 let decimal = datum_to_json_object(
621 &Field {
622 data_type: DataType::Decimal,
623 name: "aaa".to_owned(),
624 },
625 Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
626 &doris_config,
627 )
628 .unwrap();
629 assert_eq!(decimal, json!("1.11111"));
630
631 let date_value = datum_to_json_object(
632 &Field {
633 data_type: DataType::Date,
634 ..mock_field.clone()
635 },
636 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
637 &config,
638 )
639 .unwrap();
640 assert_eq!(date_value, json!(719163));
641
642 let from_epoch_config = JsonEncoderConfig {
643 time_handling_mode: TimeHandlingMode::String,
644 date_handling_mode: DateHandlingMode::FromEpoch,
645 timestamp_handling_mode: TimestampHandlingMode::String,
646 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
647 custom_json_type: CustomJsonType::None,
648 jsonb_handling_mode: JsonbHandlingMode::String,
649 };
650 let date_value = datum_to_json_object(
651 &Field {
652 data_type: DataType::Date,
653 ..mock_field.clone()
654 },
655 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
656 &from_epoch_config,
657 )
658 .unwrap();
659 assert_eq!(date_value, json!(0));
660
661 let doris_config = JsonEncoderConfig {
662 time_handling_mode: TimeHandlingMode::String,
663 date_handling_mode: DateHandlingMode::String,
664 timestamp_handling_mode: TimestampHandlingMode::String,
665 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
666 custom_json_type: CustomJsonType::Doris(HashMap::default()),
667 jsonb_handling_mode: JsonbHandlingMode::String,
668 };
669 let date_value = datum_to_json_object(
670 &Field {
671 data_type: DataType::Date,
672 ..mock_field.clone()
673 },
674 Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
675 &doris_config,
676 )
677 .unwrap();
678 assert_eq!(date_value, json!("2010-10-10"));
679
680 let value = StructValue::new(vec![
681 Some(3_i32.to_scalar_value()),
682 Some(2_i32.to_scalar_value()),
683 Some(1_i32.to_scalar_value()),
684 ]);
685
686 let interval_value = datum_to_json_object(
687 &Field {
688 data_type: DataType::Struct(StructType::new(vec![
689 ("v3", DataType::Int32),
690 ("v2", DataType::Int32),
691 ("v1", DataType::Int32),
692 ])),
693 ..mock_field.clone()
694 },
695 Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
696 &doris_config,
697 )
698 .unwrap();
699 assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
700
701 let encode_jsonb_obj_config = JsonEncoderConfig {
702 time_handling_mode: TimeHandlingMode::String,
703 date_handling_mode: DateHandlingMode::String,
704 timestamp_handling_mode: TimestampHandlingMode::String,
705 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
706 custom_json_type: CustomJsonType::None,
707 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
708 };
709 let json_value = datum_to_json_object(
710 &Field {
711 data_type: DataType::Jsonb,
712 ..mock_field.clone()
713 },
714 Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
715 &encode_jsonb_obj_config,
716 )
717 .unwrap();
718 assert_eq!(json_value, json!([1, 2, 3]));
719 }
720
721 #[test]
722 fn test_generate_json_converter_schema() {
723 let fields = vec![
724 Field {
725 data_type: DataType::Boolean,
726 name: "v1".into(),
727 },
728 Field {
729 data_type: DataType::Int16,
730 name: "v2".into(),
731 },
732 Field {
733 data_type: DataType::Int32,
734 name: "v3".into(),
735 },
736 Field {
737 data_type: DataType::Float32,
738 name: "v4".into(),
739 },
740 Field {
741 data_type: DataType::Decimal,
742 name: "v5".into(),
743 },
744 Field {
745 data_type: DataType::Date,
746 name: "v6".into(),
747 },
748 Field {
749 data_type: DataType::Varchar,
750 name: "v7".into(),
751 },
752 Field {
753 data_type: DataType::Time,
754 name: "v8".into(),
755 },
756 Field {
757 data_type: DataType::Interval,
758 name: "v9".into(),
759 },
760 Field {
761 data_type: DataType::Struct(StructType::new(vec![
762 ("a", DataType::Timestamp),
763 ("b", DataType::Timestamptz),
764 (
765 "c",
766 DataType::Struct(StructType::new(vec![
767 ("aa", DataType::Int64),
768 ("bb", DataType::Float64),
769 ])),
770 ),
771 ])),
772 name: "v10".into(),
773 },
774 Field {
775 data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
776 StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
777 ))))),
778 name: "v11".into(),
779 },
780 Field {
781 data_type: DataType::Jsonb,
782 name: "12".into(),
783 },
784 Field {
785 data_type: DataType::Serial,
786 name: "13".into(),
787 },
788 Field {
789 data_type: DataType::Int256,
790 name: "14".into(),
791 },
792 ];
793 let schema =
794 json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
795 .to_string();
796 let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
797 assert_eq!(schema, ans);
798 }
799}