1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33 CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34 Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39 time_handling_mode: TimeHandlingMode,
40 date_handling_mode: DateHandlingMode,
41 timestamp_handling_mode: TimestampHandlingMode,
42 timestamptz_handling_mode: TimestamptzHandlingMode,
43 custom_json_type: CustomJsonType,
44 jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48 schema: Schema,
49 col_indices: Option<Vec<usize>>,
50 kafka_connect: Option<KafkaConnectParamsRef>,
51 config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55 pub fn new(
56 schema: Schema,
57 col_indices: Option<Vec<usize>>,
58 date_handling_mode: DateHandlingMode,
59 timestamp_handling_mode: TimestampHandlingMode,
60 timestamptz_handling_mode: TimestamptzHandlingMode,
61 time_handling_mode: TimeHandlingMode,
62 jsonb_handling_mode: JsonbHandlingMode,
63 ) -> Self {
64 let config = JsonEncoderConfig {
65 time_handling_mode,
66 date_handling_mode,
67 timestamp_handling_mode,
68 timestamptz_handling_mode,
69 custom_json_type: CustomJsonType::None,
70 jsonb_handling_mode,
71 };
72 Self {
73 schema,
74 col_indices,
75 kafka_connect: None,
76 config,
77 }
78 }
79
80 pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81 let config = JsonEncoderConfig {
82 time_handling_mode: TimeHandlingMode::String,
83 date_handling_mode: DateHandlingMode::String,
84 timestamp_handling_mode: TimestampHandlingMode::String,
85 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86 custom_json_type: CustomJsonType::Es,
87 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88 };
89 Self {
90 schema,
91 col_indices,
92 kafka_connect: None,
93 config,
94 }
95 }
96
97 pub fn new_with_doris(
98 schema: Schema,
99 col_indices: Option<Vec<usize>>,
100 map: HashMap<String, u8>,
101 ) -> Self {
102 let config = JsonEncoderConfig {
103 time_handling_mode: TimeHandlingMode::Milli,
104 date_handling_mode: DateHandlingMode::String,
105 timestamp_handling_mode: TimestampHandlingMode::String,
106 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107 custom_json_type: CustomJsonType::Doris(map),
108 jsonb_handling_mode: JsonbHandlingMode::String,
109 };
110 Self {
111 schema,
112 col_indices,
113 kafka_connect: None,
114 config,
115 }
116 }
117
118 pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119 let config = JsonEncoderConfig {
120 time_handling_mode: TimeHandlingMode::Milli,
121 date_handling_mode: DateHandlingMode::String,
122 timestamp_handling_mode: TimestampHandlingMode::String,
123 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124 custom_json_type: CustomJsonType::StarRocks,
125 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126 };
127 Self {
128 schema,
129 col_indices,
130 kafka_connect: None,
131 config,
132 }
133 }
134
135 pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136 Self {
137 kafka_connect: Some(Arc::new(kafka_connect)),
138 ..self
139 }
140 }
141}
142
143impl RowEncoder for JsonEncoder {
144 type Output = Map<String, Value>;
145
146 fn schema(&self) -> &Schema {
147 &self.schema
148 }
149
150 fn col_indices(&self) -> Option<&[usize]> {
151 self.col_indices.as_ref().map(Vec::as_ref)
152 }
153
154 fn encode_cols(
155 &self,
156 row: impl Row,
157 col_indices: impl Iterator<Item = usize>,
158 ) -> Result<Self::Output> {
159 let mut mappings = Map::with_capacity(self.schema.len());
160 let col_indices = col_indices.collect_vec();
161 for idx in &col_indices {
162 let field = &self.schema[*idx];
163 let key = field.name.clone();
164 let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165 .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166 mappings.insert(key, value);
167 }
168
169 Ok(if let Some(param) = &self.kafka_connect {
170 json_converter_with_schema(
171 Value::Object(mappings),
172 param.schema_name.to_owned(),
173 col_indices.into_iter().map(|i| &self.schema[i]),
174 )
175 } else {
176 mappings
177 })
178 }
179}
180
181impl SerTo<String> for Map<String, Value> {
182 fn ser_to(self) -> Result<String> {
183 Value::Object(self).ser_to()
184 }
185}
186
187impl SerTo<String> for Value {
188 fn ser_to(self) -> Result<String> {
189 Ok(self.to_string())
190 }
191}
192
193fn datum_to_json_object(
194 field: &Field,
195 datum: DatumRef<'_>,
196 config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198 let scalar_ref = match datum {
199 None => {
200 return Ok(Value::Null);
201 }
202 Some(datum) => datum,
203 };
204
205 let data_type = field.data_type();
206
207 tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209 let value = match (data_type, scalar_ref) {
210 (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211 json!(v)
212 }
213 (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214 json!(v)
215 }
216 (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217 json!(v)
218 }
219 (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220 json!(v)
221 }
222 (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223 json!(format!("{:#018x}", v.into_inner()))
225 }
226 (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227 json!(f32::from(v))
228 }
229 (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230 json!(f64::from(v))
231 }
232 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233 json!(v)
234 }
235 (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237 CustomJsonType::Doris(map) => {
238 let s = map.get(&field.name).unwrap();
239 v.rescale(*s as u32);
240 json!(v.to_text())
241 }
242 CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243 json!(v.to_text())
244 }
245 },
246 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247 match config.timestamptz_handling_mode {
248 TimestamptzHandlingMode::UtcString => {
249 let parsed = v.to_datetime_utc();
250 let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251 json!(v)
252 }
253 TimestamptzHandlingMode::UtcWithoutSuffix => {
254 let parsed = v.to_datetime_utc().naive_utc();
255 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256 json!(v)
257 }
258 TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259 TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260 }
261 }
262 (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263 TimeHandlingMode::Milli => {
264 json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266 }
267 TimeHandlingMode::String => {
268 let a = v.0.format("%H:%M:%S%.6f").to_string();
269 json!(a)
270 }
271 },
272 (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273 DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274 DateHandlingMode::FromEpoch => {
275 let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
276 json!(duration.num_days())
277 }
278 DateHandlingMode::String => {
279 let a = v.0.format("%Y-%m-%d").to_string();
280 json!(a)
281 }
282 },
283 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284 match config.timestamp_handling_mode {
285 TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286 TimestampHandlingMode::String => {
287 json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288 }
289 }
290 }
291 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292 json!(general_purpose::STANDARD.encode(v))
293 }
294 (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296 json!(v.as_iso_8601())
297 }
298
299 (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300 JsonbHandlingMode::String => {
301 json!(jsonb_ref.to_string())
302 }
303 JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304 },
305 (DataType::List(lt), ScalarRefImpl::List(list_ref)) => {
306 let elems = list_ref.iter();
307 let mut vec = Vec::with_capacity(elems.len());
308 let inner_field = Field::unnamed(lt.into_elem());
309 for sub_datum_ref in elems {
310 let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311 vec.push(value);
312 }
313 json!(vec)
314 }
315 (DataType::Vector(_), ScalarRefImpl::Vector(vector)) => {
316 let elems = vector.as_raw_slice();
317 let mut vec = Vec::with_capacity(elems.len());
318 for v in elems {
319 let value = serde_json::Number::from_f64(*v as _)
320 .map(Value::Number)
321 .unwrap_or(Value::Null);
322 vec.push(value);
323 }
324 json!(vec)
325 }
326 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
327 match config.custom_json_type {
328 CustomJsonType::Doris(_) => {
329 let mut map = IndexMap::with_capacity(st.len());
331 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
332 st.iter()
333 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
334 ) {
335 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
336 map.insert(sub_field.name.clone(), value);
337 }
338 Value::String(
339 serde_json::to_string(&map).context("failed to serialize into JSON")?,
340 )
341 }
342 CustomJsonType::StarRocks => {
343 return Err(ArrayError::internal(
344 "starrocks can't support struct".to_owned(),
345 ));
346 }
347 CustomJsonType::Es | CustomJsonType::None => {
348 let mut map = Map::with_capacity(st.len());
349 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
350 st.iter()
351 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
352 ) {
353 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
354 map.insert(sub_field.name.clone(), value);
355 }
356 json!(map)
357 }
358 }
359 }
360 (data_type, scalar_ref) => {
362 return Err(ArrayError::internal(format!(
363 "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
364 field.name, data_type, scalar_ref
365 )));
366 }
367 };
368
369 Ok(value)
370}
371
372fn json_converter_with_schema<'a>(
373 object: Value,
374 name: String,
375 fields: impl Iterator<Item = &'a Field>,
376) -> Map<String, Value> {
377 let mut mapping = Map::with_capacity(2);
378 mapping.insert(
379 "schema".to_owned(),
380 json!({
381 "type": "struct",
382 "fields": fields.map(|field| {
383 let mut mapping = type_as_json_schema(&field.data_type);
384 mapping.insert("field".to_owned(), json!(field.name));
385 mapping
386 }).collect_vec(),
387 "optional": false,
388 "name": name,
389 }),
390 );
391 mapping.insert("payload".to_owned(), object);
392 mapping
393}
394
395pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
397 match rw_type {
398 DataType::Boolean => "boolean",
399 DataType::Int16 => "int16",
400 DataType::Int32 => "int32",
401 DataType::Int64 => "int64",
402 DataType::Float32 => "float",
403 DataType::Float64 => "double",
404 DataType::Decimal => "string",
405 DataType::Date => "int32",
406 DataType::Varchar => "string",
407 DataType::Time => "int64",
408 DataType::Timestamp => "int64",
409 DataType::Timestamptz => "string",
410 DataType::Interval => "string",
411 DataType::Struct(_) => "struct",
412 DataType::List(_) => "array",
413 DataType::Vector(_) => "array",
414 DataType::Bytea => "bytes",
415 DataType::Jsonb => "string",
416 DataType::Serial => "string",
417 DataType::Int256 => "string",
418 DataType::Map(_) => "map",
419 }
420}
421
422fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
423 let mut mapping = Map::with_capacity(4); mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
425 mapping.insert("optional".to_owned(), json!(true));
426 match rw_type {
427 DataType::Struct(struct_type) => {
428 let sub_fields = struct_type
429 .iter()
430 .map(|(sub_name, sub_type)| {
431 let mut sub_mapping = type_as_json_schema(sub_type);
432 sub_mapping.insert("field".to_owned(), json!(sub_name));
433 sub_mapping
434 })
435 .collect_vec();
436 mapping.insert("fields".to_owned(), json!(sub_fields));
437 }
438 DataType::List(list_type) => {
439 mapping.insert(
440 "items".to_owned(),
441 json!(type_as_json_schema(list_type.elem())),
442 );
443 }
444 _ => {}
445 }
446
447 mapping
448}
449
450#[cfg(test)]
451mod tests {
452 use risingwave_common::types::{
453 Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
454 Timestamp,
455 };
456
457 use super::*;
458
459 #[test]
460 fn test_to_json_basic_type() {
461 let mock_field = Field {
462 data_type: DataType::Boolean,
463 name: Default::default(),
464 };
465
466 let config = JsonEncoderConfig {
467 time_handling_mode: TimeHandlingMode::Milli,
468 date_handling_mode: DateHandlingMode::FromCe,
469 timestamp_handling_mode: TimestampHandlingMode::String,
470 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
471 custom_json_type: CustomJsonType::None,
472 jsonb_handling_mode: JsonbHandlingMode::String,
473 };
474
475 let boolean_value = datum_to_json_object(
476 &Field {
477 data_type: DataType::Boolean,
478 ..mock_field.clone()
479 },
480 Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
481 &config,
482 )
483 .unwrap();
484 assert_eq!(boolean_value, json!(false));
485
486 let int16_value = datum_to_json_object(
487 &Field {
488 data_type: DataType::Int16,
489 ..mock_field.clone()
490 },
491 Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
492 &config,
493 )
494 .unwrap();
495 assert_eq!(int16_value, json!(16));
496
497 let int64_value = datum_to_json_object(
498 &Field {
499 data_type: DataType::Int64,
500 ..mock_field.clone()
501 },
502 Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
503 &config,
504 )
505 .unwrap();
506 assert_eq!(
507 serde_json::to_string(&int64_value).unwrap(),
508 i64::MAX.to_string()
509 );
510
511 let serial_value = datum_to_json_object(
512 &Field {
513 data_type: DataType::Serial,
514 ..mock_field.clone()
515 },
516 Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
517 &config,
518 )
519 .unwrap();
520 assert_eq!(
521 serde_json::to_string(&serial_value).unwrap(),
522 format!("\"{:#018x}\"", i64::MAX)
523 );
524
525 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
527 let tstz_value = datum_to_json_object(
528 &Field {
529 data_type: DataType::Timestamptz,
530 ..mock_field.clone()
531 },
532 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
533 &config,
534 )
535 .unwrap();
536 assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
537
538 let unix_wo_suffix_config = JsonEncoderConfig {
539 time_handling_mode: TimeHandlingMode::Milli,
540 date_handling_mode: DateHandlingMode::FromCe,
541 timestamp_handling_mode: TimestampHandlingMode::String,
542 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
543 custom_json_type: CustomJsonType::None,
544 jsonb_handling_mode: JsonbHandlingMode::String,
545 };
546
547 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
548 let tstz_value = datum_to_json_object(
549 &Field {
550 data_type: DataType::Timestamptz,
551 ..mock_field.clone()
552 },
553 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
554 &unix_wo_suffix_config,
555 )
556 .unwrap();
557 assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
558
559 let timestamp_milli_config = JsonEncoderConfig {
560 time_handling_mode: TimeHandlingMode::String,
561 date_handling_mode: DateHandlingMode::FromCe,
562 timestamp_handling_mode: TimestampHandlingMode::Milli,
563 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
564 custom_json_type: CustomJsonType::None,
565 jsonb_handling_mode: JsonbHandlingMode::String,
566 };
567 let ts_value = datum_to_json_object(
568 &Field {
569 data_type: DataType::Timestamp,
570 ..mock_field.clone()
571 },
572 Some(
573 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
574 .as_scalar_ref_impl(),
575 ),
576 ×tamp_milli_config,
577 )
578 .unwrap();
579 assert_eq!(ts_value, json!(1000 * 1000));
580
581 let ts_value = datum_to_json_object(
582 &Field {
583 data_type: DataType::Timestamp,
584 ..mock_field.clone()
585 },
586 Some(
587 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
588 .as_scalar_ref_impl(),
589 ),
590 &config,
591 )
592 .unwrap();
593 assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
594
595 let time_value = datum_to_json_object(
597 &Field {
598 data_type: DataType::Time,
599 ..mock_field.clone()
600 },
601 Some(
602 ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
603 .as_scalar_ref_impl(),
604 ),
605 &config,
606 )
607 .unwrap();
608 assert_eq!(time_value, json!(1000 * 1000));
609
610 let interval_value = datum_to_json_object(
611 &Field {
612 data_type: DataType::Interval,
613 ..mock_field.clone()
614 },
615 Some(
616 ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
617 .as_scalar_ref_impl(),
618 ),
619 &config,
620 )
621 .unwrap();
622 assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
623
624 let mut map = HashMap::default();
625 map.insert("aaa".to_owned(), 5_u8);
626 let doris_config = JsonEncoderConfig {
627 time_handling_mode: TimeHandlingMode::String,
628 date_handling_mode: DateHandlingMode::String,
629 timestamp_handling_mode: TimestampHandlingMode::String,
630 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
631 custom_json_type: CustomJsonType::Doris(map),
632 jsonb_handling_mode: JsonbHandlingMode::String,
633 };
634 let decimal = datum_to_json_object(
635 &Field {
636 data_type: DataType::Decimal,
637 name: "aaa".to_owned(),
638 },
639 Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
640 &doris_config,
641 )
642 .unwrap();
643 assert_eq!(decimal, json!("1.11111"));
644
645 let date_value = datum_to_json_object(
646 &Field {
647 data_type: DataType::Date,
648 ..mock_field.clone()
649 },
650 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
651 &config,
652 )
653 .unwrap();
654 assert_eq!(date_value, json!(719163));
655
656 let from_epoch_config = JsonEncoderConfig {
657 time_handling_mode: TimeHandlingMode::String,
658 date_handling_mode: DateHandlingMode::FromEpoch,
659 timestamp_handling_mode: TimestampHandlingMode::String,
660 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
661 custom_json_type: CustomJsonType::None,
662 jsonb_handling_mode: JsonbHandlingMode::String,
663 };
664 let date_value = datum_to_json_object(
665 &Field {
666 data_type: DataType::Date,
667 ..mock_field.clone()
668 },
669 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
670 &from_epoch_config,
671 )
672 .unwrap();
673 assert_eq!(date_value, json!(0));
674
675 let doris_config = JsonEncoderConfig {
676 time_handling_mode: TimeHandlingMode::String,
677 date_handling_mode: DateHandlingMode::String,
678 timestamp_handling_mode: TimestampHandlingMode::String,
679 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
680 custom_json_type: CustomJsonType::Doris(HashMap::default()),
681 jsonb_handling_mode: JsonbHandlingMode::String,
682 };
683 let date_value = datum_to_json_object(
684 &Field {
685 data_type: DataType::Date,
686 ..mock_field.clone()
687 },
688 Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
689 &doris_config,
690 )
691 .unwrap();
692 assert_eq!(date_value, json!("2010-10-10"));
693
694 let value = StructValue::new(vec![
695 Some(3_i32.to_scalar_value()),
696 Some(2_i32.to_scalar_value()),
697 Some(1_i32.to_scalar_value()),
698 ]);
699
700 let interval_value = datum_to_json_object(
701 &Field {
702 data_type: DataType::Struct(StructType::new(vec![
703 ("v3", DataType::Int32),
704 ("v2", DataType::Int32),
705 ("v1", DataType::Int32),
706 ])),
707 ..mock_field.clone()
708 },
709 Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
710 &doris_config,
711 )
712 .unwrap();
713 assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
714
715 let encode_jsonb_obj_config = JsonEncoderConfig {
716 time_handling_mode: TimeHandlingMode::String,
717 date_handling_mode: DateHandlingMode::String,
718 timestamp_handling_mode: TimestampHandlingMode::String,
719 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
720 custom_json_type: CustomJsonType::None,
721 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
722 };
723 let json_value = datum_to_json_object(
724 &Field {
725 data_type: DataType::Jsonb,
726 ..mock_field.clone()
727 },
728 Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
729 &encode_jsonb_obj_config,
730 )
731 .unwrap();
732 assert_eq!(json_value, json!([1, 2, 3]));
733 }
734
735 #[test]
736 fn test_generate_json_converter_schema() {
737 let fields = vec![
738 Field {
739 data_type: DataType::Boolean,
740 name: "v1".into(),
741 },
742 Field {
743 data_type: DataType::Int16,
744 name: "v2".into(),
745 },
746 Field {
747 data_type: DataType::Int32,
748 name: "v3".into(),
749 },
750 Field {
751 data_type: DataType::Float32,
752 name: "v4".into(),
753 },
754 Field {
755 data_type: DataType::Decimal,
756 name: "v5".into(),
757 },
758 Field {
759 data_type: DataType::Date,
760 name: "v6".into(),
761 },
762 Field {
763 data_type: DataType::Varchar,
764 name: "v7".into(),
765 },
766 Field {
767 data_type: DataType::Time,
768 name: "v8".into(),
769 },
770 Field {
771 data_type: DataType::Interval,
772 name: "v9".into(),
773 },
774 Field {
775 data_type: DataType::Struct(StructType::new(vec![
776 ("a", DataType::Timestamp),
777 ("b", DataType::Timestamptz),
778 (
779 "c",
780 DataType::Struct(StructType::new(vec![
781 ("aa", DataType::Int64),
782 ("bb", DataType::Float64),
783 ])),
784 ),
785 ])),
786 name: "v10".into(),
787 },
788 Field {
789 data_type: DataType::list(DataType::list(DataType::Struct(StructType::new(vec![
790 ("aa", DataType::Int64),
791 ("bb", DataType::Float64),
792 ])))),
793 name: "v11".into(),
794 },
795 Field {
796 data_type: DataType::Jsonb,
797 name: "12".into(),
798 },
799 Field {
800 data_type: DataType::Serial,
801 name: "13".into(),
802 },
803 Field {
804 data_type: DataType::Int256,
805 name: "14".into(),
806 },
807 ];
808 let schema =
809 json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
810 .to_string();
811 let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
812 assert_eq!(schema, ans);
813 }
814}