1use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use chrono_tz::Tz;
23use indexmap::IndexMap;
24use itertools::Itertools;
25use risingwave_common::array::{ArrayError, ArrayResult};
26use risingwave_common::catalog::{Field, Schema};
27use risingwave_common::row::Row;
28use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
29use risingwave_common::util::iter_util::ZipEqDebug;
30use serde_json::{Map, Value, json};
31use thiserror_ext::AsReport;
32
33use super::{
34 CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
35 Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
36};
37use crate::sink::SinkError;
38
39pub struct JsonEncoderConfig {
40 time_handling_mode: TimeHandlingMode,
41 date_handling_mode: DateHandlingMode,
42 timestamp_handling_mode: TimestampHandlingMode,
43 timestamptz_handling_mode: TimestamptzHandlingMode,
44 custom_json_type: CustomJsonType,
45 jsonb_handling_mode: JsonbHandlingMode,
46}
47
48pub struct JsonEncoder {
49 schema: Schema,
50 col_indices: Option<Vec<usize>>,
51 kafka_connect: Option<KafkaConnectParamsRef>,
52 config: JsonEncoderConfig,
53}
54
55impl JsonEncoder {
56 pub fn new(
57 schema: Schema,
58 col_indices: Option<Vec<usize>>,
59 date_handling_mode: DateHandlingMode,
60 timestamp_handling_mode: TimestampHandlingMode,
61 timestamptz_handling_mode: TimestamptzHandlingMode,
62 time_handling_mode: TimeHandlingMode,
63 jsonb_handling_mode: JsonbHandlingMode,
64 ) -> Self {
65 let config = JsonEncoderConfig {
66 time_handling_mode,
67 date_handling_mode,
68 timestamp_handling_mode,
69 timestamptz_handling_mode,
70 custom_json_type: CustomJsonType::None,
71 jsonb_handling_mode,
72 };
73 Self {
74 schema,
75 col_indices,
76 kafka_connect: None,
77 config,
78 }
79 }
80
81 pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
82 let config = JsonEncoderConfig {
83 time_handling_mode: TimeHandlingMode::String,
84 date_handling_mode: DateHandlingMode::String,
85 timestamp_handling_mode: TimestampHandlingMode::String,
86 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
87 custom_json_type: CustomJsonType::Es,
88 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
89 };
90 Self {
91 schema,
92 col_indices,
93 kafka_connect: None,
94 config,
95 }
96 }
97
98 pub fn new_with_doris(
99 schema: Schema,
100 col_indices: Option<Vec<usize>>,
101 map: HashMap<String, u8>,
102 ) -> Self {
103 let config = JsonEncoderConfig {
104 time_handling_mode: TimeHandlingMode::Milli,
105 date_handling_mode: DateHandlingMode::String,
106 timestamp_handling_mode: TimestampHandlingMode::String,
107 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
108 custom_json_type: CustomJsonType::Doris(map),
109 jsonb_handling_mode: JsonbHandlingMode::String,
110 };
111 Self {
112 schema,
113 col_indices,
114 kafka_connect: None,
115 config,
116 }
117 }
118
119 pub fn new_with_starrocks(
120 schema: Schema,
121 col_indices: Option<Vec<usize>>,
122 time_zone: Tz,
123 ) -> Self {
124 let config = JsonEncoderConfig {
125 time_handling_mode: TimeHandlingMode::Milli,
126 date_handling_mode: DateHandlingMode::String,
127 timestamp_handling_mode: TimestampHandlingMode::String,
128 timestamptz_handling_mode: TimestamptzHandlingMode::SpecifiedTimezoneWithoutSuffix(
129 time_zone,
130 ),
131 custom_json_type: CustomJsonType::StarRocks,
132 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
133 };
134 Self {
135 schema,
136 col_indices,
137 kafka_connect: None,
138 config,
139 }
140 }
141
142 pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
143 Self {
144 kafka_connect: Some(Arc::new(kafka_connect)),
145 ..self
146 }
147 }
148}
149
150impl RowEncoder for JsonEncoder {
151 type Output = Map<String, Value>;
152
153 fn schema(&self) -> &Schema {
154 &self.schema
155 }
156
157 fn col_indices(&self) -> Option<&[usize]> {
158 self.col_indices.as_ref().map(Vec::as_ref)
159 }
160
161 fn encode_cols(
162 &self,
163 row: impl Row,
164 col_indices: impl Iterator<Item = usize>,
165 ) -> Result<Self::Output> {
166 let mut mappings = Map::with_capacity(self.schema.len());
167 let col_indices = col_indices.collect_vec();
168 for idx in &col_indices {
169 let field = &self.schema[*idx];
170 let key = field.name.clone();
171 let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
172 .map_err(|e| SinkError::Encode(e.to_report_string()))?;
173 mappings.insert(key, value);
174 }
175
176 Ok(if let Some(param) = &self.kafka_connect {
177 json_converter_with_schema(
178 Value::Object(mappings),
179 param.schema_name.clone(),
180 col_indices.into_iter().map(|i| &self.schema[i]),
181 )
182 } else {
183 mappings
184 })
185 }
186}
187
188impl SerTo<String> for Map<String, Value> {
189 fn ser_to(self) -> Result<String> {
190 Value::Object(self).ser_to()
191 }
192}
193
194impl SerTo<String> for Value {
195 fn ser_to(self) -> Result<String> {
196 Ok(self.to_string())
197 }
198}
199
200fn datum_to_json_object(
201 field: &Field,
202 datum: DatumRef<'_>,
203 config: &JsonEncoderConfig,
204) -> ArrayResult<Value> {
205 let scalar_ref = match datum {
206 None => {
207 return Ok(Value::Null);
208 }
209 Some(datum) => datum,
210 };
211
212 let data_type = field.data_type();
213
214 tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
215
216 let value = match (data_type, scalar_ref) {
217 (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
218 json!(v)
219 }
220 (DataType::Int16, ScalarRefImpl::Int16(v)) => {
221 json!(v)
222 }
223 (DataType::Int32, ScalarRefImpl::Int32(v)) => {
224 json!(v)
225 }
226 (DataType::Int64, ScalarRefImpl::Int64(v)) => {
227 json!(v)
228 }
229 (DataType::Serial, ScalarRefImpl::Serial(v)) => {
230 json!(format!("{:#018x}", v.into_inner()))
232 }
233 (DataType::Float32, ScalarRefImpl::Float32(v)) => {
234 json!(f32::from(v))
235 }
236 (DataType::Float64, ScalarRefImpl::Float64(v)) => {
237 json!(f64::from(v))
238 }
239 (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
240 json!(v)
241 }
242 (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
244 CustomJsonType::Doris(map) => {
245 let s = map.get(&field.name).unwrap();
246 v.rescale(*s as u32);
247 json!(v.to_text())
248 }
249 CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
250 json!(v.to_text())
251 }
252 },
253 (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
254 match config.timestamptz_handling_mode {
255 TimestamptzHandlingMode::UtcString => {
256 let parsed = v.to_datetime_utc();
257 let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
258 json!(v)
259 }
260 TimestamptzHandlingMode::UtcWithoutSuffix => {
261 let parsed = v.to_datetime_utc().naive_utc();
262 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
263 json!(v)
264 }
265 TimestamptzHandlingMode::SpecifiedTimezoneWithoutSuffix(time_zone) => {
266 let parsed = v.to_datetime_in_zone(time_zone).naive_local();
267 let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
268 json!(v)
269 }
270 TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
271 TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
272 }
273 }
274 (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
275 TimeHandlingMode::Milli => {
276 json!(v.0.num_seconds_from_midnight() as i64 * 1000)
278 }
279 TimeHandlingMode::String => {
280 let a = v.0.format("%H:%M:%S%.6f").to_string();
281 json!(a)
282 }
283 },
284 (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
285 DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
286 DateHandlingMode::FromEpoch => {
287 let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
288 json!(duration.num_days())
289 }
290 DateHandlingMode::String => {
291 let a = v.0.format("%Y-%m-%d").to_string();
292 json!(a)
293 }
294 },
295 (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
296 match config.timestamp_handling_mode {
297 TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
298 TimestampHandlingMode::String => {
299 json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
300 }
301 }
302 }
303 (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
304 json!(general_purpose::STANDARD.encode(v))
305 }
306 (DataType::Interval, ScalarRefImpl::Interval(v)) => {
308 json!(v.as_iso_8601())
309 }
310
311 (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
312 JsonbHandlingMode::String => {
313 json!(jsonb_ref.to_string())
314 }
315 JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
316 },
317 (DataType::List(lt), ScalarRefImpl::List(list_ref)) => {
318 let elems = list_ref.iter();
319 let mut vec = Vec::with_capacity(elems.len());
320 let inner_field = Field::unnamed(lt.into_elem());
321 for sub_datum_ref in elems {
322 let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
323 vec.push(value);
324 }
325 json!(vec)
326 }
327 (DataType::Vector(_), ScalarRefImpl::Vector(vector)) => {
328 let elems = vector.as_raw_slice();
329 let mut vec = Vec::with_capacity(elems.len());
330 for v in elems {
331 let value = serde_json::Number::from_f64(*v as _)
332 .map(Value::Number)
333 .unwrap_or(Value::Null);
334 vec.push(value);
335 }
336 json!(vec)
337 }
338 (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
339 match config.custom_json_type {
340 CustomJsonType::Doris(_) => {
341 let mut map = IndexMap::with_capacity(st.len());
343 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
344 st.iter()
345 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
346 ) {
347 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
348 map.insert(sub_field.name.clone(), value);
349 }
350 Value::String(
351 serde_json::to_string(&map).context("failed to serialize into JSON")?,
352 )
353 }
354 CustomJsonType::StarRocks => {
355 return Err(ArrayError::internal(
356 "starrocks can't support struct".to_owned(),
357 ));
358 }
359 CustomJsonType::Es | CustomJsonType::None => {
360 let mut map = Map::with_capacity(st.len());
361 for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
362 st.iter()
363 .map(|(name, dt)| Field::with_name(dt.clone(), name)),
364 ) {
365 let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
366 map.insert(sub_field.name.clone(), value);
367 }
368 json!(map)
369 }
370 }
371 }
372 (data_type, scalar_ref) => {
374 return Err(ArrayError::internal(format!(
375 "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
376 field.name, data_type, scalar_ref
377 )));
378 }
379 };
380
381 Ok(value)
382}
383
384fn json_converter_with_schema<'a>(
385 object: Value,
386 name: String,
387 fields: impl Iterator<Item = &'a Field>,
388) -> Map<String, Value> {
389 let mut mapping = Map::with_capacity(2);
390 mapping.insert(
391 "schema".to_owned(),
392 json!({
393 "type": "struct",
394 "fields": fields.map(|field| {
395 let mut mapping = type_as_json_schema(&field.data_type);
396 mapping.insert("field".to_owned(), json!(field.name));
397 mapping
398 }).collect_vec(),
399 "optional": false,
400 "name": name,
401 }),
402 );
403 mapping.insert("payload".to_owned(), object);
404 mapping
405}
406
407pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
409 match rw_type {
410 DataType::Boolean => "boolean",
411 DataType::Int16 => "int16",
412 DataType::Int32 => "int32",
413 DataType::Int64 => "int64",
414 DataType::Float32 => "float",
415 DataType::Float64 => "double",
416 DataType::Decimal => "string",
417 DataType::Date => "int32",
418 DataType::Varchar => "string",
419 DataType::Time => "int64",
420 DataType::Timestamp => "int64",
421 DataType::Timestamptz => "string",
422 DataType::Interval => "string",
423 DataType::Struct(_) => "struct",
424 DataType::List(_) => "array",
425 DataType::Vector(_) => "array",
426 DataType::Bytea => "bytes",
427 DataType::Jsonb => "string",
428 DataType::Serial => "string",
429 DataType::Int256 => "string",
430 DataType::Map(_) => "map",
431 }
432}
433
434fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
435 let mut mapping = Map::with_capacity(4); mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
437 mapping.insert("optional".to_owned(), json!(true));
438 match rw_type {
439 DataType::Struct(struct_type) => {
440 let sub_fields = struct_type
441 .iter()
442 .map(|(sub_name, sub_type)| {
443 let mut sub_mapping = type_as_json_schema(sub_type);
444 sub_mapping.insert("field".to_owned(), json!(sub_name));
445 sub_mapping
446 })
447 .collect_vec();
448 mapping.insert("fields".to_owned(), json!(sub_fields));
449 }
450 DataType::List(list_type) => {
451 mapping.insert(
452 "items".to_owned(),
453 json!(type_as_json_schema(list_type.elem())),
454 );
455 }
456 _ => {}
457 }
458
459 mapping
460}
461
462#[cfg(test)]
463mod tests {
464 use risingwave_common::row::OwnedRow;
465 use risingwave_common::types::{
466 Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
467 Timestamp,
468 };
469
470 use super::*;
471
472 #[test]
473 fn test_starrocks_timestamptz_encoding() {
474 let schema = Schema::new(vec![Field::with_name(DataType::Timestamptz, "ts")]);
475 let encoder = JsonEncoder::new_with_starrocks(schema, None, chrono_tz::Asia::Shanghai);
476 let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
477 let row = OwnedRow::new(vec![Some(ScalarImpl::Timestamptz(tstz))]);
478
479 let encoded = encoder.encode(row).unwrap();
480
481 assert_eq!(
482 encoded.get("ts"),
483 Some(&json!("2018-01-27 02:30:09.453000"))
484 );
485 }
486
487 #[test]
488 fn test_to_json_basic_type() {
489 let mock_field = Field {
490 data_type: DataType::Boolean,
491 name: Default::default(),
492 };
493
494 let config = JsonEncoderConfig {
495 time_handling_mode: TimeHandlingMode::Milli,
496 date_handling_mode: DateHandlingMode::FromCe,
497 timestamp_handling_mode: TimestampHandlingMode::String,
498 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
499 custom_json_type: CustomJsonType::None,
500 jsonb_handling_mode: JsonbHandlingMode::String,
501 };
502
503 let boolean_value = datum_to_json_object(
504 &Field {
505 data_type: DataType::Boolean,
506 ..mock_field.clone()
507 },
508 Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
509 &config,
510 )
511 .unwrap();
512 assert_eq!(boolean_value, json!(false));
513
514 let int16_value = datum_to_json_object(
515 &Field {
516 data_type: DataType::Int16,
517 ..mock_field.clone()
518 },
519 Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
520 &config,
521 )
522 .unwrap();
523 assert_eq!(int16_value, json!(16));
524
525 let int64_value = datum_to_json_object(
526 &Field {
527 data_type: DataType::Int64,
528 ..mock_field.clone()
529 },
530 Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
531 &config,
532 )
533 .unwrap();
534 assert_eq!(
535 serde_json::to_string(&int64_value).unwrap(),
536 i64::MAX.to_string()
537 );
538
539 let serial_value = datum_to_json_object(
540 &Field {
541 data_type: DataType::Serial,
542 ..mock_field.clone()
543 },
544 Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
545 &config,
546 )
547 .unwrap();
548 assert_eq!(
549 serde_json::to_string(&serial_value).unwrap(),
550 format!("\"{:#018x}\"", i64::MAX)
551 );
552
553 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
555 let tstz_value = datum_to_json_object(
556 &Field {
557 data_type: DataType::Timestamptz,
558 ..mock_field.clone()
559 },
560 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
561 &config,
562 )
563 .unwrap();
564 assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
565
566 let unix_wo_suffix_config = JsonEncoderConfig {
567 time_handling_mode: TimeHandlingMode::Milli,
568 date_handling_mode: DateHandlingMode::FromCe,
569 timestamp_handling_mode: TimestampHandlingMode::String,
570 timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
571 custom_json_type: CustomJsonType::None,
572 jsonb_handling_mode: JsonbHandlingMode::String,
573 };
574
575 let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
576 let tstz_value = datum_to_json_object(
577 &Field {
578 data_type: DataType::Timestamptz,
579 ..mock_field.clone()
580 },
581 Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
582 &unix_wo_suffix_config,
583 )
584 .unwrap();
585 assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
586
587 let timestamp_milli_config = JsonEncoderConfig {
588 time_handling_mode: TimeHandlingMode::String,
589 date_handling_mode: DateHandlingMode::FromCe,
590 timestamp_handling_mode: TimestampHandlingMode::Milli,
591 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
592 custom_json_type: CustomJsonType::None,
593 jsonb_handling_mode: JsonbHandlingMode::String,
594 };
595 let ts_value = datum_to_json_object(
596 &Field {
597 data_type: DataType::Timestamp,
598 ..mock_field.clone()
599 },
600 Some(
601 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
602 .as_scalar_ref_impl(),
603 ),
604 ×tamp_milli_config,
605 )
606 .unwrap();
607 assert_eq!(ts_value, json!(1000 * 1000));
608
609 let ts_value = datum_to_json_object(
610 &Field {
611 data_type: DataType::Timestamp,
612 ..mock_field.clone()
613 },
614 Some(
615 ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
616 .as_scalar_ref_impl(),
617 ),
618 &config,
619 )
620 .unwrap();
621 assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
622
623 let time_value = datum_to_json_object(
625 &Field {
626 data_type: DataType::Time,
627 ..mock_field.clone()
628 },
629 Some(
630 ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
631 .as_scalar_ref_impl(),
632 ),
633 &config,
634 )
635 .unwrap();
636 assert_eq!(time_value, json!(1000 * 1000));
637
638 let interval_value = datum_to_json_object(
639 &Field {
640 data_type: DataType::Interval,
641 ..mock_field.clone()
642 },
643 Some(
644 ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
645 .as_scalar_ref_impl(),
646 ),
647 &config,
648 )
649 .unwrap();
650 assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
651
652 let mut map = HashMap::default();
653 map.insert("aaa".to_owned(), 5_u8);
654 let doris_config = JsonEncoderConfig {
655 time_handling_mode: TimeHandlingMode::String,
656 date_handling_mode: DateHandlingMode::String,
657 timestamp_handling_mode: TimestampHandlingMode::String,
658 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
659 custom_json_type: CustomJsonType::Doris(map),
660 jsonb_handling_mode: JsonbHandlingMode::String,
661 };
662 let decimal = datum_to_json_object(
663 &Field {
664 data_type: DataType::Decimal,
665 name: "aaa".to_owned(),
666 },
667 Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
668 &doris_config,
669 )
670 .unwrap();
671 assert_eq!(decimal, json!("1.11111"));
672
673 let date_value = datum_to_json_object(
674 &Field {
675 data_type: DataType::Date,
676 ..mock_field.clone()
677 },
678 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
679 &config,
680 )
681 .unwrap();
682 assert_eq!(date_value, json!(719163));
683
684 let from_epoch_config = JsonEncoderConfig {
685 time_handling_mode: TimeHandlingMode::String,
686 date_handling_mode: DateHandlingMode::FromEpoch,
687 timestamp_handling_mode: TimestampHandlingMode::String,
688 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
689 custom_json_type: CustomJsonType::None,
690 jsonb_handling_mode: JsonbHandlingMode::String,
691 };
692 let date_value = datum_to_json_object(
693 &Field {
694 data_type: DataType::Date,
695 ..mock_field.clone()
696 },
697 Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
698 &from_epoch_config,
699 )
700 .unwrap();
701 assert_eq!(date_value, json!(0));
702
703 let doris_config = JsonEncoderConfig {
704 time_handling_mode: TimeHandlingMode::String,
705 date_handling_mode: DateHandlingMode::String,
706 timestamp_handling_mode: TimestampHandlingMode::String,
707 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
708 custom_json_type: CustomJsonType::Doris(HashMap::default()),
709 jsonb_handling_mode: JsonbHandlingMode::String,
710 };
711 let date_value = datum_to_json_object(
712 &Field {
713 data_type: DataType::Date,
714 ..mock_field.clone()
715 },
716 Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
717 &doris_config,
718 )
719 .unwrap();
720 assert_eq!(date_value, json!("2010-10-10"));
721
722 let value = StructValue::new(vec![
723 Some(3_i32.to_scalar_value()),
724 Some(2_i32.to_scalar_value()),
725 Some(1_i32.to_scalar_value()),
726 ]);
727
728 let interval_value = datum_to_json_object(
729 &Field {
730 data_type: DataType::Struct(StructType::new(vec![
731 ("v3", DataType::Int32),
732 ("v2", DataType::Int32),
733 ("v1", DataType::Int32),
734 ])),
735 ..mock_field.clone()
736 },
737 Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
738 &doris_config,
739 )
740 .unwrap();
741 assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
742
743 let encode_jsonb_obj_config = JsonEncoderConfig {
744 time_handling_mode: TimeHandlingMode::String,
745 date_handling_mode: DateHandlingMode::String,
746 timestamp_handling_mode: TimestampHandlingMode::String,
747 timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
748 custom_json_type: CustomJsonType::None,
749 jsonb_handling_mode: JsonbHandlingMode::Dynamic,
750 };
751 let json_value = datum_to_json_object(
752 &Field {
753 data_type: DataType::Jsonb,
754 ..mock_field
755 },
756 Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
757 &encode_jsonb_obj_config,
758 )
759 .unwrap();
760 assert_eq!(json_value, json!([1, 2, 3]));
761 }
762
763 #[test]
764 fn test_generate_json_converter_schema() {
765 let fields = vec![
766 Field {
767 data_type: DataType::Boolean,
768 name: "v1".into(),
769 },
770 Field {
771 data_type: DataType::Int16,
772 name: "v2".into(),
773 },
774 Field {
775 data_type: DataType::Int32,
776 name: "v3".into(),
777 },
778 Field {
779 data_type: DataType::Float32,
780 name: "v4".into(),
781 },
782 Field {
783 data_type: DataType::Decimal,
784 name: "v5".into(),
785 },
786 Field {
787 data_type: DataType::Date,
788 name: "v6".into(),
789 },
790 Field {
791 data_type: DataType::Varchar,
792 name: "v7".into(),
793 },
794 Field {
795 data_type: DataType::Time,
796 name: "v8".into(),
797 },
798 Field {
799 data_type: DataType::Interval,
800 name: "v9".into(),
801 },
802 Field {
803 data_type: DataType::Struct(StructType::new(vec![
804 ("a", DataType::Timestamp),
805 ("b", DataType::Timestamptz),
806 (
807 "c",
808 DataType::Struct(StructType::new(vec![
809 ("aa", DataType::Int64),
810 ("bb", DataType::Float64),
811 ])),
812 ),
813 ])),
814 name: "v10".into(),
815 },
816 Field {
817 data_type: DataType::list(DataType::list(DataType::Struct(StructType::new(vec![
818 ("aa", DataType::Int64),
819 ("bb", DataType::Float64),
820 ])))),
821 name: "v11".into(),
822 },
823 Field {
824 data_type: DataType::Jsonb,
825 name: "12".into(),
826 },
827 Field {
828 data_type: DataType::Serial,
829 name: "13".into(),
830 },
831 Field {
832 data_type: DataType::Int256,
833 name: "14".into(),
834 },
835 ];
836 let schema =
837 json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
838 .to_string();
839 let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
840 assert_eq!(schema, ans);
841 }
842}