risingwave_connector/sink/encoder/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33    CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34    Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39    time_handling_mode: TimeHandlingMode,
40    date_handling_mode: DateHandlingMode,
41    timestamp_handling_mode: TimestampHandlingMode,
42    timestamptz_handling_mode: TimestamptzHandlingMode,
43    custom_json_type: CustomJsonType,
44    jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48    schema: Schema,
49    col_indices: Option<Vec<usize>>,
50    kafka_connect: Option<KafkaConnectParamsRef>,
51    config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55    pub fn new(
56        schema: Schema,
57        col_indices: Option<Vec<usize>>,
58        date_handling_mode: DateHandlingMode,
59        timestamp_handling_mode: TimestampHandlingMode,
60        timestamptz_handling_mode: TimestamptzHandlingMode,
61        time_handling_mode: TimeHandlingMode,
62        jsonb_handling_mode: JsonbHandlingMode,
63    ) -> Self {
64        let config = JsonEncoderConfig {
65            time_handling_mode,
66            date_handling_mode,
67            timestamp_handling_mode,
68            timestamptz_handling_mode,
69            custom_json_type: CustomJsonType::None,
70            jsonb_handling_mode,
71        };
72        Self {
73            schema,
74            col_indices,
75            kafka_connect: None,
76            config,
77        }
78    }
79
80    pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81        let config = JsonEncoderConfig {
82            time_handling_mode: TimeHandlingMode::String,
83            date_handling_mode: DateHandlingMode::String,
84            timestamp_handling_mode: TimestampHandlingMode::String,
85            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86            custom_json_type: CustomJsonType::Es,
87            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88        };
89        Self {
90            schema,
91            col_indices,
92            kafka_connect: None,
93            config,
94        }
95    }
96
97    pub fn new_with_doris(
98        schema: Schema,
99        col_indices: Option<Vec<usize>>,
100        map: HashMap<String, u8>,
101    ) -> Self {
102        let config = JsonEncoderConfig {
103            time_handling_mode: TimeHandlingMode::Milli,
104            date_handling_mode: DateHandlingMode::String,
105            timestamp_handling_mode: TimestampHandlingMode::String,
106            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107            custom_json_type: CustomJsonType::Doris(map),
108            jsonb_handling_mode: JsonbHandlingMode::String,
109        };
110        Self {
111            schema,
112            col_indices,
113            kafka_connect: None,
114            config,
115        }
116    }
117
118    pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119        let config = JsonEncoderConfig {
120            time_handling_mode: TimeHandlingMode::Milli,
121            date_handling_mode: DateHandlingMode::String,
122            timestamp_handling_mode: TimestampHandlingMode::String,
123            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124            custom_json_type: CustomJsonType::StarRocks,
125            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126        };
127        Self {
128            schema,
129            col_indices,
130            kafka_connect: None,
131            config,
132        }
133    }
134
135    pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136        Self {
137            kafka_connect: Some(Arc::new(kafka_connect)),
138            ..self
139        }
140    }
141}
142
143impl RowEncoder for JsonEncoder {
144    type Output = Map<String, Value>;
145
146    fn schema(&self) -> &Schema {
147        &self.schema
148    }
149
150    fn col_indices(&self) -> Option<&[usize]> {
151        self.col_indices.as_ref().map(Vec::as_ref)
152    }
153
154    fn encode_cols(
155        &self,
156        row: impl Row,
157        col_indices: impl Iterator<Item = usize>,
158    ) -> Result<Self::Output> {
159        let mut mappings = Map::with_capacity(self.schema.len());
160        let col_indices = col_indices.collect_vec();
161        for idx in &col_indices {
162            let field = &self.schema[*idx];
163            let key = field.name.clone();
164            let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165                .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166            mappings.insert(key, value);
167        }
168
169        Ok(if let Some(param) = &self.kafka_connect {
170            json_converter_with_schema(
171                Value::Object(mappings),
172                param.schema_name.to_owned(),
173                col_indices.into_iter().map(|i| &self.schema[i]),
174            )
175        } else {
176            mappings
177        })
178    }
179}
180
181impl SerTo<String> for Map<String, Value> {
182    fn ser_to(self) -> Result<String> {
183        Value::Object(self).ser_to()
184    }
185}
186
187impl SerTo<String> for Value {
188    fn ser_to(self) -> Result<String> {
189        Ok(self.to_string())
190    }
191}
192
193fn datum_to_json_object(
194    field: &Field,
195    datum: DatumRef<'_>,
196    config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198    let scalar_ref = match datum {
199        None => {
200            return Ok(Value::Null);
201        }
202        Some(datum) => datum,
203    };
204
205    let data_type = field.data_type();
206
207    tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209    let value = match (data_type, scalar_ref) {
210        (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211            json!(v)
212        }
213        (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214            json!(v)
215        }
216        (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217            json!(v)
218        }
219        (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220            json!(v)
221        }
222        (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223            // The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers.
224            json!(format!("{:#018x}", v.into_inner()))
225        }
226        (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227            json!(f32::from(v))
228        }
229        (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230            json!(f64::from(v))
231        }
232        (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233            json!(v)
234        }
235        // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
236        (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237            CustomJsonType::Doris(map) => {
238                let s = map.get(&field.name).unwrap();
239                v.rescale(*s as u32);
240                json!(v.to_text())
241            }
242            CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243                json!(v.to_text())
244            }
245        },
246        (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247            match config.timestamptz_handling_mode {
248                TimestamptzHandlingMode::UtcString => {
249                    let parsed = v.to_datetime_utc();
250                    let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251                    json!(v)
252                }
253                TimestamptzHandlingMode::UtcWithoutSuffix => {
254                    let parsed = v.to_datetime_utc().naive_utc();
255                    let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256                    json!(v)
257                }
258                TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259                TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260            }
261        }
262        (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263            TimeHandlingMode::Milli => {
264                // todo: just ignore the nanos part to avoid leap second complex
265                json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266            }
267            TimeHandlingMode::String => {
268                let a = v.0.format("%H:%M:%S%.6f").to_string();
269                json!(a)
270            }
271        },
272        (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273            DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274            DateHandlingMode::FromEpoch => {
275                let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
276                json!(duration.num_days())
277            }
278            DateHandlingMode::String => {
279                let a = v.0.format("%Y-%m-%d").to_string();
280                json!(a)
281            }
282        },
283        (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284            match config.timestamp_handling_mode {
285                TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286                TimestampHandlingMode::String => {
287                    json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288                }
289            }
290        }
291        (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292            json!(general_purpose::STANDARD.encode(v))
293        }
294        // P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S
295        (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296            json!(v.as_iso_8601())
297        }
298
299        (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300            JsonbHandlingMode::String => {
301                json!(jsonb_ref.to_string())
302            }
303            JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304        },
305        (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306            let elems = list_ref.iter();
307            let mut vec = Vec::with_capacity(elems.len());
308            let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309            for sub_datum_ref in elems {
310                let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311                vec.push(value);
312            }
313            json!(vec)
314        }
315        (DataType::Vector(_), ScalarRefImpl::Vector(vector)) => {
316            let elems = vector.as_raw_slice();
317            let mut vec = Vec::with_capacity(elems.len());
318            for v in elems {
319                let value = serde_json::Number::from_f64(*v as _)
320                    .map(Value::Number)
321                    .unwrap_or(Value::Null);
322                vec.push(value);
323            }
324            json!(vec)
325        }
326        (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
327            match config.custom_json_type {
328                CustomJsonType::Doris(_) => {
329                    // We need to ensure that the order of elements in the json matches the insertion order.
330                    let mut map = IndexMap::with_capacity(st.len());
331                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
332                        st.iter()
333                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
334                    ) {
335                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
336                        map.insert(sub_field.name.clone(), value);
337                    }
338                    Value::String(
339                        serde_json::to_string(&map).context("failed to serialize into JSON")?,
340                    )
341                }
342                CustomJsonType::StarRocks => {
343                    return Err(ArrayError::internal(
344                        "starrocks can't support struct".to_owned(),
345                    ));
346                }
347                CustomJsonType::Es | CustomJsonType::None => {
348                    let mut map = Map::with_capacity(st.len());
349                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
350                        st.iter()
351                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
352                    ) {
353                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
354                        map.insert(sub_field.name.clone(), value);
355                    }
356                    json!(map)
357                }
358            }
359        }
360        // TODO(map): support map
361        (data_type, scalar_ref) => {
362            return Err(ArrayError::internal(format!(
363                "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
364                field.name, data_type, scalar_ref
365            )));
366        }
367    };
368
369    Ok(value)
370}
371
372fn json_converter_with_schema<'a>(
373    object: Value,
374    name: String,
375    fields: impl Iterator<Item = &'a Field>,
376) -> Map<String, Value> {
377    let mut mapping = Map::with_capacity(2);
378    mapping.insert(
379        "schema".to_owned(),
380        json!({
381            "type": "struct",
382            "fields": fields.map(|field| {
383                let mut mapping = type_as_json_schema(&field.data_type);
384                mapping.insert("field".to_owned(), json!(field.name));
385                mapping
386            }).collect_vec(),
387            "optional": false,
388            "name": name,
389        }),
390    );
391    mapping.insert("payload".to_owned(), object);
392    mapping
393}
394
395// reference: https://github.com/apache/kafka/blob/80982c4ae3fe6be127b48ec09caff11ab5f87c69/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java#L39
396pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
397    match rw_type {
398        DataType::Boolean => "boolean",
399        DataType::Int16 => "int16",
400        DataType::Int32 => "int32",
401        DataType::Int64 => "int64",
402        DataType::Float32 => "float",
403        DataType::Float64 => "double",
404        DataType::Decimal => "string",
405        DataType::Date => "int32",
406        DataType::Varchar => "string",
407        DataType::Time => "int64",
408        DataType::Timestamp => "int64",
409        DataType::Timestamptz => "string",
410        DataType::Interval => "string",
411        DataType::Struct(_) => "struct",
412        DataType::List(_) => "array",
413        DataType::Vector(_) => "array",
414        DataType::Bytea => "bytes",
415        DataType::Jsonb => "string",
416        DataType::Serial => "string",
417        DataType::Int256 => "string",
418        DataType::Map(_) => "map",
419    }
420}
421
422fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
423    let mut mapping = Map::with_capacity(4); // type + optional + fields/items + field
424    mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
425    mapping.insert("optional".to_owned(), json!(true));
426    match rw_type {
427        DataType::Struct(struct_type) => {
428            let sub_fields = struct_type
429                .iter()
430                .map(|(sub_name, sub_type)| {
431                    let mut sub_mapping = type_as_json_schema(sub_type);
432                    sub_mapping.insert("field".to_owned(), json!(sub_name));
433                    sub_mapping
434                })
435                .collect_vec();
436            mapping.insert("fields".to_owned(), json!(sub_fields));
437        }
438        DataType::List(sub_type) => {
439            mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
440        }
441        _ => {}
442    }
443
444    mapping
445}
446
447#[cfg(test)]
448mod tests {
449    use risingwave_common::types::{
450        Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
451        Timestamp,
452    };
453
454    use super::*;
455
456    #[test]
457    fn test_to_json_basic_type() {
458        let mock_field = Field {
459            data_type: DataType::Boolean,
460            name: Default::default(),
461        };
462
463        let config = JsonEncoderConfig {
464            time_handling_mode: TimeHandlingMode::Milli,
465            date_handling_mode: DateHandlingMode::FromCe,
466            timestamp_handling_mode: TimestampHandlingMode::String,
467            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
468            custom_json_type: CustomJsonType::None,
469            jsonb_handling_mode: JsonbHandlingMode::String,
470        };
471
472        let boolean_value = datum_to_json_object(
473            &Field {
474                data_type: DataType::Boolean,
475                ..mock_field.clone()
476            },
477            Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
478            &config,
479        )
480        .unwrap();
481        assert_eq!(boolean_value, json!(false));
482
483        let int16_value = datum_to_json_object(
484            &Field {
485                data_type: DataType::Int16,
486                ..mock_field.clone()
487            },
488            Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
489            &config,
490        )
491        .unwrap();
492        assert_eq!(int16_value, json!(16));
493
494        let int64_value = datum_to_json_object(
495            &Field {
496                data_type: DataType::Int64,
497                ..mock_field.clone()
498            },
499            Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
500            &config,
501        )
502        .unwrap();
503        assert_eq!(
504            serde_json::to_string(&int64_value).unwrap(),
505            i64::MAX.to_string()
506        );
507
508        let serial_value = datum_to_json_object(
509            &Field {
510                data_type: DataType::Serial,
511                ..mock_field.clone()
512            },
513            Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
514            &config,
515        )
516        .unwrap();
517        assert_eq!(
518            serde_json::to_string(&serial_value).unwrap(),
519            format!("\"{:#018x}\"", i64::MAX)
520        );
521
522        // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
523        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
524        let tstz_value = datum_to_json_object(
525            &Field {
526                data_type: DataType::Timestamptz,
527                ..mock_field.clone()
528            },
529            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
530            &config,
531        )
532        .unwrap();
533        assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
534
535        let unix_wo_suffix_config = JsonEncoderConfig {
536            time_handling_mode: TimeHandlingMode::Milli,
537            date_handling_mode: DateHandlingMode::FromCe,
538            timestamp_handling_mode: TimestampHandlingMode::String,
539            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
540            custom_json_type: CustomJsonType::None,
541            jsonb_handling_mode: JsonbHandlingMode::String,
542        };
543
544        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
545        let tstz_value = datum_to_json_object(
546            &Field {
547                data_type: DataType::Timestamptz,
548                ..mock_field.clone()
549            },
550            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
551            &unix_wo_suffix_config,
552        )
553        .unwrap();
554        assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
555
556        let timestamp_milli_config = JsonEncoderConfig {
557            time_handling_mode: TimeHandlingMode::String,
558            date_handling_mode: DateHandlingMode::FromCe,
559            timestamp_handling_mode: TimestampHandlingMode::Milli,
560            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
561            custom_json_type: CustomJsonType::None,
562            jsonb_handling_mode: JsonbHandlingMode::String,
563        };
564        let ts_value = datum_to_json_object(
565            &Field {
566                data_type: DataType::Timestamp,
567                ..mock_field.clone()
568            },
569            Some(
570                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
571                    .as_scalar_ref_impl(),
572            ),
573            &timestamp_milli_config,
574        )
575        .unwrap();
576        assert_eq!(ts_value, json!(1000 * 1000));
577
578        let ts_value = datum_to_json_object(
579            &Field {
580                data_type: DataType::Timestamp,
581                ..mock_field.clone()
582            },
583            Some(
584                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
585                    .as_scalar_ref_impl(),
586            ),
587            &config,
588        )
589        .unwrap();
590        assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
591
592        // Represents the number of milliseconds past midnigh, org.apache.kafka.connect.data.Time
593        let time_value = datum_to_json_object(
594            &Field {
595                data_type: DataType::Time,
596                ..mock_field.clone()
597            },
598            Some(
599                ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
600                    .as_scalar_ref_impl(),
601            ),
602            &config,
603        )
604        .unwrap();
605        assert_eq!(time_value, json!(1000 * 1000));
606
607        let interval_value = datum_to_json_object(
608            &Field {
609                data_type: DataType::Interval,
610                ..mock_field.clone()
611            },
612            Some(
613                ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
614                    .as_scalar_ref_impl(),
615            ),
616            &config,
617        )
618        .unwrap();
619        assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
620
621        let mut map = HashMap::default();
622        map.insert("aaa".to_owned(), 5_u8);
623        let doris_config = JsonEncoderConfig {
624            time_handling_mode: TimeHandlingMode::String,
625            date_handling_mode: DateHandlingMode::String,
626            timestamp_handling_mode: TimestampHandlingMode::String,
627            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
628            custom_json_type: CustomJsonType::Doris(map),
629            jsonb_handling_mode: JsonbHandlingMode::String,
630        };
631        let decimal = datum_to_json_object(
632            &Field {
633                data_type: DataType::Decimal,
634                name: "aaa".to_owned(),
635            },
636            Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
637            &doris_config,
638        )
639        .unwrap();
640        assert_eq!(decimal, json!("1.11111"));
641
642        let date_value = datum_to_json_object(
643            &Field {
644                data_type: DataType::Date,
645                ..mock_field.clone()
646            },
647            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
648            &config,
649        )
650        .unwrap();
651        assert_eq!(date_value, json!(719163));
652
653        let from_epoch_config = JsonEncoderConfig {
654            time_handling_mode: TimeHandlingMode::String,
655            date_handling_mode: DateHandlingMode::FromEpoch,
656            timestamp_handling_mode: TimestampHandlingMode::String,
657            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
658            custom_json_type: CustomJsonType::None,
659            jsonb_handling_mode: JsonbHandlingMode::String,
660        };
661        let date_value = datum_to_json_object(
662            &Field {
663                data_type: DataType::Date,
664                ..mock_field.clone()
665            },
666            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
667            &from_epoch_config,
668        )
669        .unwrap();
670        assert_eq!(date_value, json!(0));
671
672        let doris_config = JsonEncoderConfig {
673            time_handling_mode: TimeHandlingMode::String,
674            date_handling_mode: DateHandlingMode::String,
675            timestamp_handling_mode: TimestampHandlingMode::String,
676            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
677            custom_json_type: CustomJsonType::Doris(HashMap::default()),
678            jsonb_handling_mode: JsonbHandlingMode::String,
679        };
680        let date_value = datum_to_json_object(
681            &Field {
682                data_type: DataType::Date,
683                ..mock_field.clone()
684            },
685            Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
686            &doris_config,
687        )
688        .unwrap();
689        assert_eq!(date_value, json!("2010-10-10"));
690
691        let value = StructValue::new(vec![
692            Some(3_i32.to_scalar_value()),
693            Some(2_i32.to_scalar_value()),
694            Some(1_i32.to_scalar_value()),
695        ]);
696
697        let interval_value = datum_to_json_object(
698            &Field {
699                data_type: DataType::Struct(StructType::new(vec![
700                    ("v3", DataType::Int32),
701                    ("v2", DataType::Int32),
702                    ("v1", DataType::Int32),
703                ])),
704                ..mock_field.clone()
705            },
706            Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
707            &doris_config,
708        )
709        .unwrap();
710        assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
711
712        let encode_jsonb_obj_config = JsonEncoderConfig {
713            time_handling_mode: TimeHandlingMode::String,
714            date_handling_mode: DateHandlingMode::String,
715            timestamp_handling_mode: TimestampHandlingMode::String,
716            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
717            custom_json_type: CustomJsonType::None,
718            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
719        };
720        let json_value = datum_to_json_object(
721            &Field {
722                data_type: DataType::Jsonb,
723                ..mock_field.clone()
724            },
725            Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
726            &encode_jsonb_obj_config,
727        )
728        .unwrap();
729        assert_eq!(json_value, json!([1, 2, 3]));
730    }
731
732    #[test]
733    fn test_generate_json_converter_schema() {
734        let fields = vec![
735            Field {
736                data_type: DataType::Boolean,
737                name: "v1".into(),
738            },
739            Field {
740                data_type: DataType::Int16,
741                name: "v2".into(),
742            },
743            Field {
744                data_type: DataType::Int32,
745                name: "v3".into(),
746            },
747            Field {
748                data_type: DataType::Float32,
749                name: "v4".into(),
750            },
751            Field {
752                data_type: DataType::Decimal,
753                name: "v5".into(),
754            },
755            Field {
756                data_type: DataType::Date,
757                name: "v6".into(),
758            },
759            Field {
760                data_type: DataType::Varchar,
761                name: "v7".into(),
762            },
763            Field {
764                data_type: DataType::Time,
765                name: "v8".into(),
766            },
767            Field {
768                data_type: DataType::Interval,
769                name: "v9".into(),
770            },
771            Field {
772                data_type: DataType::Struct(StructType::new(vec![
773                    ("a", DataType::Timestamp),
774                    ("b", DataType::Timestamptz),
775                    (
776                        "c",
777                        DataType::Struct(StructType::new(vec![
778                            ("aa", DataType::Int64),
779                            ("bb", DataType::Float64),
780                        ])),
781                    ),
782                ])),
783                name: "v10".into(),
784            },
785            Field {
786                data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
787                    StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
788                ))))),
789                name: "v11".into(),
790            },
791            Field {
792                data_type: DataType::Jsonb,
793                name: "12".into(),
794            },
795            Field {
796                data_type: DataType::Serial,
797                name: "13".into(),
798            },
799            Field {
800                data_type: DataType::Int256,
801                name: "14".into(),
802            },
803        ];
804        let schema =
805            json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
806                .to_string();
807        let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
808        assert_eq!(schema, ans);
809    }
810}