risingwave_connector/sink/encoder/
json.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use chrono_tz::Tz;
23use indexmap::IndexMap;
24use itertools::Itertools;
25use risingwave_common::array::{ArrayError, ArrayResult};
26use risingwave_common::catalog::{Field, Schema};
27use risingwave_common::row::Row;
28use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
29use risingwave_common::util::iter_util::ZipEqDebug;
30use serde_json::{Map, Value, json};
31use thiserror_ext::AsReport;
32
33use super::{
34    CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
35    Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
36};
37use crate::sink::SinkError;
38
39pub struct JsonEncoderConfig {
40    time_handling_mode: TimeHandlingMode,
41    date_handling_mode: DateHandlingMode,
42    timestamp_handling_mode: TimestampHandlingMode,
43    timestamptz_handling_mode: TimestamptzHandlingMode,
44    custom_json_type: CustomJsonType,
45    jsonb_handling_mode: JsonbHandlingMode,
46}
47
48pub struct JsonEncoder {
49    schema: Schema,
50    col_indices: Option<Vec<usize>>,
51    kafka_connect: Option<KafkaConnectParamsRef>,
52    config: JsonEncoderConfig,
53}
54
55impl JsonEncoder {
56    pub fn new(
57        schema: Schema,
58        col_indices: Option<Vec<usize>>,
59        date_handling_mode: DateHandlingMode,
60        timestamp_handling_mode: TimestampHandlingMode,
61        timestamptz_handling_mode: TimestamptzHandlingMode,
62        time_handling_mode: TimeHandlingMode,
63        jsonb_handling_mode: JsonbHandlingMode,
64    ) -> Self {
65        let config = JsonEncoderConfig {
66            time_handling_mode,
67            date_handling_mode,
68            timestamp_handling_mode,
69            timestamptz_handling_mode,
70            custom_json_type: CustomJsonType::None,
71            jsonb_handling_mode,
72        };
73        Self {
74            schema,
75            col_indices,
76            kafka_connect: None,
77            config,
78        }
79    }
80
81    pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
82        let config = JsonEncoderConfig {
83            time_handling_mode: TimeHandlingMode::String,
84            date_handling_mode: DateHandlingMode::String,
85            timestamp_handling_mode: TimestampHandlingMode::String,
86            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
87            custom_json_type: CustomJsonType::Es,
88            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
89        };
90        Self {
91            schema,
92            col_indices,
93            kafka_connect: None,
94            config,
95        }
96    }
97
98    pub fn new_with_doris(
99        schema: Schema,
100        col_indices: Option<Vec<usize>>,
101        map: HashMap<String, u8>,
102    ) -> Self {
103        let config = JsonEncoderConfig {
104            time_handling_mode: TimeHandlingMode::Milli,
105            date_handling_mode: DateHandlingMode::String,
106            timestamp_handling_mode: TimestampHandlingMode::String,
107            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
108            custom_json_type: CustomJsonType::Doris(map),
109            jsonb_handling_mode: JsonbHandlingMode::String,
110        };
111        Self {
112            schema,
113            col_indices,
114            kafka_connect: None,
115            config,
116        }
117    }
118
119    pub fn new_with_starrocks(
120        schema: Schema,
121        col_indices: Option<Vec<usize>>,
122        time_zone: Tz,
123    ) -> Self {
124        let config = JsonEncoderConfig {
125            time_handling_mode: TimeHandlingMode::Milli,
126            date_handling_mode: DateHandlingMode::String,
127            timestamp_handling_mode: TimestampHandlingMode::String,
128            timestamptz_handling_mode: TimestamptzHandlingMode::SpecifiedTimezoneWithoutSuffix(
129                time_zone,
130            ),
131            custom_json_type: CustomJsonType::StarRocks,
132            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
133        };
134        Self {
135            schema,
136            col_indices,
137            kafka_connect: None,
138            config,
139        }
140    }
141
142    pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
143        Self {
144            kafka_connect: Some(Arc::new(kafka_connect)),
145            ..self
146        }
147    }
148}
149
150impl RowEncoder for JsonEncoder {
151    type Output = Map<String, Value>;
152
153    fn schema(&self) -> &Schema {
154        &self.schema
155    }
156
157    fn col_indices(&self) -> Option<&[usize]> {
158        self.col_indices.as_ref().map(Vec::as_ref)
159    }
160
161    fn encode_cols(
162        &self,
163        row: impl Row,
164        col_indices: impl Iterator<Item = usize>,
165    ) -> Result<Self::Output> {
166        let mut mappings = Map::with_capacity(self.schema.len());
167        let col_indices = col_indices.collect_vec();
168        for idx in &col_indices {
169            let field = &self.schema[*idx];
170            let key = field.name.clone();
171            let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
172                .map_err(|e| SinkError::Encode(e.to_report_string()))?;
173            mappings.insert(key, value);
174        }
175
176        Ok(if let Some(param) = &self.kafka_connect {
177            json_converter_with_schema(
178                Value::Object(mappings),
179                param.schema_name.clone(),
180                col_indices.into_iter().map(|i| &self.schema[i]),
181            )
182        } else {
183            mappings
184        })
185    }
186}
187
188impl SerTo<String> for Map<String, Value> {
189    fn ser_to(self) -> Result<String> {
190        Value::Object(self).ser_to()
191    }
192}
193
194impl SerTo<String> for Value {
195    fn ser_to(self) -> Result<String> {
196        Ok(self.to_string())
197    }
198}
199
200fn datum_to_json_object(
201    field: &Field,
202    datum: DatumRef<'_>,
203    config: &JsonEncoderConfig,
204) -> ArrayResult<Value> {
205    let scalar_ref = match datum {
206        None => {
207            return Ok(Value::Null);
208        }
209        Some(datum) => datum,
210    };
211
212    let data_type = field.data_type();
213
214    tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
215
216    let value = match (data_type, scalar_ref) {
217        (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
218            json!(v)
219        }
220        (DataType::Int16, ScalarRefImpl::Int16(v)) => {
221            json!(v)
222        }
223        (DataType::Int32, ScalarRefImpl::Int32(v)) => {
224            json!(v)
225        }
226        (DataType::Int64, ScalarRefImpl::Int64(v)) => {
227            json!(v)
228        }
229        (DataType::Serial, ScalarRefImpl::Serial(v)) => {
230            // The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers.
231            json!(format!("{:#018x}", v.into_inner()))
232        }
233        (DataType::Float32, ScalarRefImpl::Float32(v)) => {
234            json!(f32::from(v))
235        }
236        (DataType::Float64, ScalarRefImpl::Float64(v)) => {
237            json!(f64::from(v))
238        }
239        (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
240            json!(v)
241        }
242        // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
243        (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
244            CustomJsonType::Doris(map) => {
245                let s = map.get(&field.name).unwrap();
246                v.rescale(*s as u32);
247                json!(v.to_text())
248            }
249            CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
250                json!(v.to_text())
251            }
252        },
253        (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
254            match config.timestamptz_handling_mode {
255                TimestamptzHandlingMode::UtcString => {
256                    let parsed = v.to_datetime_utc();
257                    let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
258                    json!(v)
259                }
260                TimestamptzHandlingMode::UtcWithoutSuffix => {
261                    let parsed = v.to_datetime_utc().naive_utc();
262                    let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
263                    json!(v)
264                }
265                TimestamptzHandlingMode::SpecifiedTimezoneWithoutSuffix(time_zone) => {
266                    let parsed = v.to_datetime_in_zone(time_zone).naive_local();
267                    let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
268                    json!(v)
269                }
270                TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
271                TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
272            }
273        }
274        (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
275            TimeHandlingMode::Milli => {
276                // todo: just ignore the nanos part to avoid leap second complex
277                json!(v.0.num_seconds_from_midnight() as i64 * 1000)
278            }
279            TimeHandlingMode::String => {
280                let a = v.0.format("%H:%M:%S%.6f").to_string();
281                json!(a)
282            }
283        },
284        (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
285            DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
286            DateHandlingMode::FromEpoch => {
287                let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
288                json!(duration.num_days())
289            }
290            DateHandlingMode::String => {
291                let a = v.0.format("%Y-%m-%d").to_string();
292                json!(a)
293            }
294        },
295        (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
296            match config.timestamp_handling_mode {
297                TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
298                TimestampHandlingMode::String => {
299                    json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
300                }
301            }
302        }
303        (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
304            json!(general_purpose::STANDARD.encode(v))
305        }
306        // P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S
307        (DataType::Interval, ScalarRefImpl::Interval(v)) => {
308            json!(v.as_iso_8601())
309        }
310
311        (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
312            JsonbHandlingMode::String => {
313                json!(jsonb_ref.to_string())
314            }
315            JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
316        },
317        (DataType::List(lt), ScalarRefImpl::List(list_ref)) => {
318            let elems = list_ref.iter();
319            let mut vec = Vec::with_capacity(elems.len());
320            let inner_field = Field::unnamed(lt.into_elem());
321            for sub_datum_ref in elems {
322                let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
323                vec.push(value);
324            }
325            json!(vec)
326        }
327        (DataType::Vector(_), ScalarRefImpl::Vector(vector)) => {
328            let elems = vector.as_raw_slice();
329            let mut vec = Vec::with_capacity(elems.len());
330            for v in elems {
331                let value = serde_json::Number::from_f64(*v as _)
332                    .map(Value::Number)
333                    .unwrap_or(Value::Null);
334                vec.push(value);
335            }
336            json!(vec)
337        }
338        (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
339            match config.custom_json_type {
340                CustomJsonType::Doris(_) => {
341                    // We need to ensure that the order of elements in the json matches the insertion order.
342                    let mut map = IndexMap::with_capacity(st.len());
343                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
344                        st.iter()
345                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
346                    ) {
347                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
348                        map.insert(sub_field.name.clone(), value);
349                    }
350                    Value::String(
351                        serde_json::to_string(&map).context("failed to serialize into JSON")?,
352                    )
353                }
354                CustomJsonType::StarRocks => {
355                    return Err(ArrayError::internal(
356                        "starrocks can't support struct".to_owned(),
357                    ));
358                }
359                CustomJsonType::Es | CustomJsonType::None => {
360                    let mut map = Map::with_capacity(st.len());
361                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
362                        st.iter()
363                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
364                    ) {
365                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
366                        map.insert(sub_field.name.clone(), value);
367                    }
368                    json!(map)
369                }
370            }
371        }
372        // TODO(map): support map
373        (data_type, scalar_ref) => {
374            return Err(ArrayError::internal(format!(
375                "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
376                field.name, data_type, scalar_ref
377            )));
378        }
379    };
380
381    Ok(value)
382}
383
384fn json_converter_with_schema<'a>(
385    object: Value,
386    name: String,
387    fields: impl Iterator<Item = &'a Field>,
388) -> Map<String, Value> {
389    let mut mapping = Map::with_capacity(2);
390    mapping.insert(
391        "schema".to_owned(),
392        json!({
393            "type": "struct",
394            "fields": fields.map(|field| {
395                let mut mapping = type_as_json_schema(&field.data_type);
396                mapping.insert("field".to_owned(), json!(field.name));
397                mapping
398            }).collect_vec(),
399            "optional": false,
400            "name": name,
401        }),
402    );
403    mapping.insert("payload".to_owned(), object);
404    mapping
405}
406
407// reference: https://github.com/apache/kafka/blob/80982c4ae3fe6be127b48ec09caff11ab5f87c69/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java#L39
408pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
409    match rw_type {
410        DataType::Boolean => "boolean",
411        DataType::Int16 => "int16",
412        DataType::Int32 => "int32",
413        DataType::Int64 => "int64",
414        DataType::Float32 => "float",
415        DataType::Float64 => "double",
416        DataType::Decimal => "string",
417        DataType::Date => "int32",
418        DataType::Varchar => "string",
419        DataType::Time => "int64",
420        DataType::Timestamp => "int64",
421        DataType::Timestamptz => "string",
422        DataType::Interval => "string",
423        DataType::Struct(_) => "struct",
424        DataType::List(_) => "array",
425        DataType::Vector(_) => "array",
426        DataType::Bytea => "bytes",
427        DataType::Jsonb => "string",
428        DataType::Serial => "string",
429        DataType::Int256 => "string",
430        DataType::Map(_) => "map",
431    }
432}
433
434fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
435    let mut mapping = Map::with_capacity(4); // type + optional + fields/items + field
436    mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
437    mapping.insert("optional".to_owned(), json!(true));
438    match rw_type {
439        DataType::Struct(struct_type) => {
440            let sub_fields = struct_type
441                .iter()
442                .map(|(sub_name, sub_type)| {
443                    let mut sub_mapping = type_as_json_schema(sub_type);
444                    sub_mapping.insert("field".to_owned(), json!(sub_name));
445                    sub_mapping
446                })
447                .collect_vec();
448            mapping.insert("fields".to_owned(), json!(sub_fields));
449        }
450        DataType::List(list_type) => {
451            mapping.insert(
452                "items".to_owned(),
453                json!(type_as_json_schema(list_type.elem())),
454            );
455        }
456        _ => {}
457    }
458
459    mapping
460}
461
462#[cfg(test)]
463mod tests {
464    use risingwave_common::row::OwnedRow;
465    use risingwave_common::types::{
466        Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
467        Timestamp,
468    };
469
470    use super::*;
471
472    #[test]
473    fn test_starrocks_timestamptz_encoding() {
474        let schema = Schema::new(vec![Field::with_name(DataType::Timestamptz, "ts")]);
475        let encoder = JsonEncoder::new_with_starrocks(schema, None, chrono_tz::Asia::Shanghai);
476        let tstz = "2018-01-26T18:30:09.453Z".parse().unwrap();
477        let row = OwnedRow::new(vec![Some(ScalarImpl::Timestamptz(tstz))]);
478
479        let encoded = encoder.encode(row).unwrap();
480
481        assert_eq!(
482            encoded.get("ts"),
483            Some(&json!("2018-01-27 02:30:09.453000"))
484        );
485    }
486
487    #[test]
488    fn test_to_json_basic_type() {
489        let mock_field = Field {
490            data_type: DataType::Boolean,
491            name: Default::default(),
492        };
493
494        let config = JsonEncoderConfig {
495            time_handling_mode: TimeHandlingMode::Milli,
496            date_handling_mode: DateHandlingMode::FromCe,
497            timestamp_handling_mode: TimestampHandlingMode::String,
498            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
499            custom_json_type: CustomJsonType::None,
500            jsonb_handling_mode: JsonbHandlingMode::String,
501        };
502
503        let boolean_value = datum_to_json_object(
504            &Field {
505                data_type: DataType::Boolean,
506                ..mock_field.clone()
507            },
508            Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
509            &config,
510        )
511        .unwrap();
512        assert_eq!(boolean_value, json!(false));
513
514        let int16_value = datum_to_json_object(
515            &Field {
516                data_type: DataType::Int16,
517                ..mock_field.clone()
518            },
519            Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
520            &config,
521        )
522        .unwrap();
523        assert_eq!(int16_value, json!(16));
524
525        let int64_value = datum_to_json_object(
526            &Field {
527                data_type: DataType::Int64,
528                ..mock_field.clone()
529            },
530            Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
531            &config,
532        )
533        .unwrap();
534        assert_eq!(
535            serde_json::to_string(&int64_value).unwrap(),
536            i64::MAX.to_string()
537        );
538
539        let serial_value = datum_to_json_object(
540            &Field {
541                data_type: DataType::Serial,
542                ..mock_field.clone()
543            },
544            Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
545            &config,
546        )
547        .unwrap();
548        assert_eq!(
549            serde_json::to_string(&serial_value).unwrap(),
550            format!("\"{:#018x}\"", i64::MAX)
551        );
552
553        // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
554        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
555        let tstz_value = datum_to_json_object(
556            &Field {
557                data_type: DataType::Timestamptz,
558                ..mock_field.clone()
559            },
560            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
561            &config,
562        )
563        .unwrap();
564        assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
565
566        let unix_wo_suffix_config = JsonEncoderConfig {
567            time_handling_mode: TimeHandlingMode::Milli,
568            date_handling_mode: DateHandlingMode::FromCe,
569            timestamp_handling_mode: TimestampHandlingMode::String,
570            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
571            custom_json_type: CustomJsonType::None,
572            jsonb_handling_mode: JsonbHandlingMode::String,
573        };
574
575        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
576        let tstz_value = datum_to_json_object(
577            &Field {
578                data_type: DataType::Timestamptz,
579                ..mock_field.clone()
580            },
581            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
582            &unix_wo_suffix_config,
583        )
584        .unwrap();
585        assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
586
587        let timestamp_milli_config = JsonEncoderConfig {
588            time_handling_mode: TimeHandlingMode::String,
589            date_handling_mode: DateHandlingMode::FromCe,
590            timestamp_handling_mode: TimestampHandlingMode::Milli,
591            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
592            custom_json_type: CustomJsonType::None,
593            jsonb_handling_mode: JsonbHandlingMode::String,
594        };
595        let ts_value = datum_to_json_object(
596            &Field {
597                data_type: DataType::Timestamp,
598                ..mock_field.clone()
599            },
600            Some(
601                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
602                    .as_scalar_ref_impl(),
603            ),
604            &timestamp_milli_config,
605        )
606        .unwrap();
607        assert_eq!(ts_value, json!(1000 * 1000));
608
609        let ts_value = datum_to_json_object(
610            &Field {
611                data_type: DataType::Timestamp,
612                ..mock_field.clone()
613            },
614            Some(
615                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
616                    .as_scalar_ref_impl(),
617            ),
618            &config,
619        )
620        .unwrap();
621        assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
622
623        // Represents the number of milliseconds past midnigh, org.apache.kafka.connect.data.Time
624        let time_value = datum_to_json_object(
625            &Field {
626                data_type: DataType::Time,
627                ..mock_field.clone()
628            },
629            Some(
630                ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
631                    .as_scalar_ref_impl(),
632            ),
633            &config,
634        )
635        .unwrap();
636        assert_eq!(time_value, json!(1000 * 1000));
637
638        let interval_value = datum_to_json_object(
639            &Field {
640                data_type: DataType::Interval,
641                ..mock_field.clone()
642            },
643            Some(
644                ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
645                    .as_scalar_ref_impl(),
646            ),
647            &config,
648        )
649        .unwrap();
650        assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
651
652        let mut map = HashMap::default();
653        map.insert("aaa".to_owned(), 5_u8);
654        let doris_config = JsonEncoderConfig {
655            time_handling_mode: TimeHandlingMode::String,
656            date_handling_mode: DateHandlingMode::String,
657            timestamp_handling_mode: TimestampHandlingMode::String,
658            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
659            custom_json_type: CustomJsonType::Doris(map),
660            jsonb_handling_mode: JsonbHandlingMode::String,
661        };
662        let decimal = datum_to_json_object(
663            &Field {
664                data_type: DataType::Decimal,
665                name: "aaa".to_owned(),
666            },
667            Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
668            &doris_config,
669        )
670        .unwrap();
671        assert_eq!(decimal, json!("1.11111"));
672
673        let date_value = datum_to_json_object(
674            &Field {
675                data_type: DataType::Date,
676                ..mock_field.clone()
677            },
678            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
679            &config,
680        )
681        .unwrap();
682        assert_eq!(date_value, json!(719163));
683
684        let from_epoch_config = JsonEncoderConfig {
685            time_handling_mode: TimeHandlingMode::String,
686            date_handling_mode: DateHandlingMode::FromEpoch,
687            timestamp_handling_mode: TimestampHandlingMode::String,
688            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
689            custom_json_type: CustomJsonType::None,
690            jsonb_handling_mode: JsonbHandlingMode::String,
691        };
692        let date_value = datum_to_json_object(
693            &Field {
694                data_type: DataType::Date,
695                ..mock_field.clone()
696            },
697            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
698            &from_epoch_config,
699        )
700        .unwrap();
701        assert_eq!(date_value, json!(0));
702
703        let doris_config = JsonEncoderConfig {
704            time_handling_mode: TimeHandlingMode::String,
705            date_handling_mode: DateHandlingMode::String,
706            timestamp_handling_mode: TimestampHandlingMode::String,
707            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
708            custom_json_type: CustomJsonType::Doris(HashMap::default()),
709            jsonb_handling_mode: JsonbHandlingMode::String,
710        };
711        let date_value = datum_to_json_object(
712            &Field {
713                data_type: DataType::Date,
714                ..mock_field.clone()
715            },
716            Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
717            &doris_config,
718        )
719        .unwrap();
720        assert_eq!(date_value, json!("2010-10-10"));
721
722        let value = StructValue::new(vec![
723            Some(3_i32.to_scalar_value()),
724            Some(2_i32.to_scalar_value()),
725            Some(1_i32.to_scalar_value()),
726        ]);
727
728        let interval_value = datum_to_json_object(
729            &Field {
730                data_type: DataType::Struct(StructType::new(vec![
731                    ("v3", DataType::Int32),
732                    ("v2", DataType::Int32),
733                    ("v1", DataType::Int32),
734                ])),
735                ..mock_field.clone()
736            },
737            Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
738            &doris_config,
739        )
740        .unwrap();
741        assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
742
743        let encode_jsonb_obj_config = JsonEncoderConfig {
744            time_handling_mode: TimeHandlingMode::String,
745            date_handling_mode: DateHandlingMode::String,
746            timestamp_handling_mode: TimestampHandlingMode::String,
747            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
748            custom_json_type: CustomJsonType::None,
749            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
750        };
751        let json_value = datum_to_json_object(
752            &Field {
753                data_type: DataType::Jsonb,
754                ..mock_field
755            },
756            Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
757            &encode_jsonb_obj_config,
758        )
759        .unwrap();
760        assert_eq!(json_value, json!([1, 2, 3]));
761    }
762
763    #[test]
764    fn test_generate_json_converter_schema() {
765        let fields = vec![
766            Field {
767                data_type: DataType::Boolean,
768                name: "v1".into(),
769            },
770            Field {
771                data_type: DataType::Int16,
772                name: "v2".into(),
773            },
774            Field {
775                data_type: DataType::Int32,
776                name: "v3".into(),
777            },
778            Field {
779                data_type: DataType::Float32,
780                name: "v4".into(),
781            },
782            Field {
783                data_type: DataType::Decimal,
784                name: "v5".into(),
785            },
786            Field {
787                data_type: DataType::Date,
788                name: "v6".into(),
789            },
790            Field {
791                data_type: DataType::Varchar,
792                name: "v7".into(),
793            },
794            Field {
795                data_type: DataType::Time,
796                name: "v8".into(),
797            },
798            Field {
799                data_type: DataType::Interval,
800                name: "v9".into(),
801            },
802            Field {
803                data_type: DataType::Struct(StructType::new(vec![
804                    ("a", DataType::Timestamp),
805                    ("b", DataType::Timestamptz),
806                    (
807                        "c",
808                        DataType::Struct(StructType::new(vec![
809                            ("aa", DataType::Int64),
810                            ("bb", DataType::Float64),
811                        ])),
812                    ),
813                ])),
814                name: "v10".into(),
815            },
816            Field {
817                data_type: DataType::list(DataType::list(DataType::Struct(StructType::new(vec![
818                    ("aa", DataType::Int64),
819                    ("bb", DataType::Float64),
820                ])))),
821                name: "v11".into(),
822            },
823            Field {
824                data_type: DataType::Jsonb,
825                name: "12".into(),
826            },
827            Field {
828                data_type: DataType::Serial,
829                name: "13".into(),
830            },
831            Field {
832                data_type: DataType::Int256,
833                name: "14".into(),
834            },
835        ];
836        let schema =
837            json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
838                .to_string();
839        let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
840        assert_eq!(schema, ans);
841    }
842}