risingwave_connector/sink/encoder/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{Datelike, NaiveDateTime, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33    CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34    Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39    time_handling_mode: TimeHandlingMode,
40    date_handling_mode: DateHandlingMode,
41    timestamp_handling_mode: TimestampHandlingMode,
42    timestamptz_handling_mode: TimestamptzHandlingMode,
43    custom_json_type: CustomJsonType,
44    jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48    schema: Schema,
49    col_indices: Option<Vec<usize>>,
50    kafka_connect: Option<KafkaConnectParamsRef>,
51    config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55    pub fn new(
56        schema: Schema,
57        col_indices: Option<Vec<usize>>,
58        date_handling_mode: DateHandlingMode,
59        timestamp_handling_mode: TimestampHandlingMode,
60        timestamptz_handling_mode: TimestamptzHandlingMode,
61        time_handling_mode: TimeHandlingMode,
62        jsonb_handling_mode: JsonbHandlingMode,
63    ) -> Self {
64        let config = JsonEncoderConfig {
65            time_handling_mode,
66            date_handling_mode,
67            timestamp_handling_mode,
68            timestamptz_handling_mode,
69            custom_json_type: CustomJsonType::None,
70            jsonb_handling_mode,
71        };
72        Self {
73            schema,
74            col_indices,
75            kafka_connect: None,
76            config,
77        }
78    }
79
80    pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81        let config = JsonEncoderConfig {
82            time_handling_mode: TimeHandlingMode::String,
83            date_handling_mode: DateHandlingMode::String,
84            timestamp_handling_mode: TimestampHandlingMode::String,
85            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86            custom_json_type: CustomJsonType::Es,
87            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88        };
89        Self {
90            schema,
91            col_indices,
92            kafka_connect: None,
93            config,
94        }
95    }
96
97    pub fn new_with_doris(
98        schema: Schema,
99        col_indices: Option<Vec<usize>>,
100        map: HashMap<String, u8>,
101    ) -> Self {
102        let config = JsonEncoderConfig {
103            time_handling_mode: TimeHandlingMode::Milli,
104            date_handling_mode: DateHandlingMode::String,
105            timestamp_handling_mode: TimestampHandlingMode::String,
106            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107            custom_json_type: CustomJsonType::Doris(map),
108            jsonb_handling_mode: JsonbHandlingMode::String,
109        };
110        Self {
111            schema,
112            col_indices,
113            kafka_connect: None,
114            config,
115        }
116    }
117
118    pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119        let config = JsonEncoderConfig {
120            time_handling_mode: TimeHandlingMode::Milli,
121            date_handling_mode: DateHandlingMode::String,
122            timestamp_handling_mode: TimestampHandlingMode::String,
123            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124            custom_json_type: CustomJsonType::StarRocks,
125            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126        };
127        Self {
128            schema,
129            col_indices,
130            kafka_connect: None,
131            config,
132        }
133    }
134
135    pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136        Self {
137            kafka_connect: Some(Arc::new(kafka_connect)),
138            ..self
139        }
140    }
141}
142
143impl RowEncoder for JsonEncoder {
144    type Output = Map<String, Value>;
145
146    fn schema(&self) -> &Schema {
147        &self.schema
148    }
149
150    fn col_indices(&self) -> Option<&[usize]> {
151        self.col_indices.as_ref().map(Vec::as_ref)
152    }
153
154    fn encode_cols(
155        &self,
156        row: impl Row,
157        col_indices: impl Iterator<Item = usize>,
158    ) -> Result<Self::Output> {
159        let mut mappings = Map::with_capacity(self.schema.len());
160        let col_indices = col_indices.collect_vec();
161        for idx in &col_indices {
162            let field = &self.schema[*idx];
163            let key = field.name.clone();
164            let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165                .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166            mappings.insert(key, value);
167        }
168
169        Ok(if let Some(param) = &self.kafka_connect {
170            json_converter_with_schema(
171                Value::Object(mappings),
172                param.schema_name.to_owned(),
173                col_indices.into_iter().map(|i| &self.schema[i]),
174            )
175        } else {
176            mappings
177        })
178    }
179}
180
181impl SerTo<String> for Map<String, Value> {
182    fn ser_to(self) -> Result<String> {
183        Value::Object(self).ser_to()
184    }
185}
186
187impl SerTo<String> for Value {
188    fn ser_to(self) -> Result<String> {
189        Ok(self.to_string())
190    }
191}
192
193fn datum_to_json_object(
194    field: &Field,
195    datum: DatumRef<'_>,
196    config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198    let scalar_ref = match datum {
199        None => {
200            return Ok(Value::Null);
201        }
202        Some(datum) => datum,
203    };
204
205    let data_type = field.data_type();
206
207    tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209    let value = match (data_type, scalar_ref) {
210        (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211            json!(v)
212        }
213        (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214            json!(v)
215        }
216        (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217            json!(v)
218        }
219        (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220            json!(v)
221        }
222        (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223            // The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers.
224            json!(format!("{:#018x}", v.into_inner()))
225        }
226        (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227            json!(f32::from(v))
228        }
229        (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230            json!(f64::from(v))
231        }
232        (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233            json!(v)
234        }
235        // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
236        (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237            CustomJsonType::Doris(map) => {
238                let s = map.get(&field.name).unwrap();
239                v.rescale(*s as u32);
240                json!(v.to_text())
241            }
242            CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243                json!(v.to_text())
244            }
245        },
246        (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247            match config.timestamptz_handling_mode {
248                TimestamptzHandlingMode::UtcString => {
249                    let parsed = v.to_datetime_utc();
250                    let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251                    json!(v)
252                }
253                TimestamptzHandlingMode::UtcWithoutSuffix => {
254                    let parsed = v.to_datetime_utc().naive_utc();
255                    let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256                    json!(v)
257                }
258                TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259                TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260            }
261        }
262        (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263            TimeHandlingMode::Milli => {
264                // todo: just ignore the nanos part to avoid leap second complex
265                json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266            }
267            TimeHandlingMode::String => {
268                let a = v.0.format("%H:%M:%S%.6f").to_string();
269                json!(a)
270            }
271        },
272        (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273            DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274            DateHandlingMode::FromEpoch => {
275                let duration = v.0 - NaiveDateTime::UNIX_EPOCH.date();
276                json!(duration.num_days())
277            }
278            DateHandlingMode::String => {
279                let a = v.0.format("%Y-%m-%d").to_string();
280                json!(a)
281            }
282        },
283        (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284            match config.timestamp_handling_mode {
285                TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286                TimestampHandlingMode::String => {
287                    json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288                }
289            }
290        }
291        (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292            json!(general_purpose::STANDARD.encode(v))
293        }
294        // P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S
295        (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296            json!(v.as_iso_8601())
297        }
298
299        (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300            JsonbHandlingMode::String => {
301                json!(jsonb_ref.to_string())
302            }
303            JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304        },
305        (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306            let elems = list_ref.iter();
307            let mut vec = Vec::with_capacity(elems.len());
308            let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309            for sub_datum_ref in elems {
310                let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311                vec.push(value);
312            }
313            json!(vec)
314        }
315        (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
316            match config.custom_json_type {
317                CustomJsonType::Doris(_) => {
318                    // We need to ensure that the order of elements in the json matches the insertion order.
319                    let mut map = IndexMap::with_capacity(st.len());
320                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
321                        st.iter()
322                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
323                    ) {
324                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
325                        map.insert(sub_field.name.clone(), value);
326                    }
327                    Value::String(
328                        serde_json::to_string(&map).context("failed to serialize into JSON")?,
329                    )
330                }
331                CustomJsonType::StarRocks => {
332                    return Err(ArrayError::internal(
333                        "starrocks can't support struct".to_owned(),
334                    ));
335                }
336                CustomJsonType::Es | CustomJsonType::None => {
337                    let mut map = Map::with_capacity(st.len());
338                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
339                        st.iter()
340                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
341                    ) {
342                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
343                        map.insert(sub_field.name.clone(), value);
344                    }
345                    json!(map)
346                }
347            }
348        }
349        // TODO(map): support map
350        (data_type, scalar_ref) => {
351            return Err(ArrayError::internal(format!(
352                "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
353                field.name, data_type, scalar_ref
354            )));
355        }
356    };
357
358    Ok(value)
359}
360
361fn json_converter_with_schema<'a>(
362    object: Value,
363    name: String,
364    fields: impl Iterator<Item = &'a Field>,
365) -> Map<String, Value> {
366    let mut mapping = Map::with_capacity(2);
367    mapping.insert(
368        "schema".to_owned(),
369        json!({
370            "type": "struct",
371            "fields": fields.map(|field| {
372                let mut mapping = type_as_json_schema(&field.data_type);
373                mapping.insert("field".to_owned(), json!(field.name));
374                mapping
375            }).collect_vec(),
376            "optional": false,
377            "name": name,
378        }),
379    );
380    mapping.insert("payload".to_owned(), object);
381    mapping
382}
383
384// reference: https://github.com/apache/kafka/blob/80982c4ae3fe6be127b48ec09caff11ab5f87c69/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java#L39
385pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
386    match rw_type {
387        DataType::Boolean => "boolean",
388        DataType::Int16 => "int16",
389        DataType::Int32 => "int32",
390        DataType::Int64 => "int64",
391        DataType::Float32 => "float",
392        DataType::Float64 => "double",
393        DataType::Decimal => "string",
394        DataType::Date => "int32",
395        DataType::Varchar => "string",
396        DataType::Time => "int64",
397        DataType::Timestamp => "int64",
398        DataType::Timestamptz => "string",
399        DataType::Interval => "string",
400        DataType::Struct(_) => "struct",
401        DataType::List(_) => "array",
402        DataType::Bytea => "bytes",
403        DataType::Jsonb => "string",
404        DataType::Serial => "string",
405        DataType::Int256 => "string",
406        DataType::Map(_) => "map",
407    }
408}
409
410fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
411    let mut mapping = Map::with_capacity(4); // type + optional + fields/items + field
412    mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
413    mapping.insert("optional".to_owned(), json!(true));
414    match rw_type {
415        DataType::Struct(struct_type) => {
416            let sub_fields = struct_type
417                .iter()
418                .map(|(sub_name, sub_type)| {
419                    let mut sub_mapping = type_as_json_schema(sub_type);
420                    sub_mapping.insert("field".to_owned(), json!(sub_name));
421                    sub_mapping
422                })
423                .collect_vec();
424            mapping.insert("fields".to_owned(), json!(sub_fields));
425        }
426        DataType::List(sub_type) => {
427            mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
428        }
429        _ => {}
430    }
431
432    mapping
433}
434
435#[cfg(test)]
436mod tests {
437    use risingwave_common::types::{
438        Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
439        Timestamp,
440    };
441
442    use super::*;
443
444    #[test]
445    fn test_to_json_basic_type() {
446        let mock_field = Field {
447            data_type: DataType::Boolean,
448            name: Default::default(),
449        };
450
451        let config = JsonEncoderConfig {
452            time_handling_mode: TimeHandlingMode::Milli,
453            date_handling_mode: DateHandlingMode::FromCe,
454            timestamp_handling_mode: TimestampHandlingMode::String,
455            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
456            custom_json_type: CustomJsonType::None,
457            jsonb_handling_mode: JsonbHandlingMode::String,
458        };
459
460        let boolean_value = datum_to_json_object(
461            &Field {
462                data_type: DataType::Boolean,
463                ..mock_field.clone()
464            },
465            Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
466            &config,
467        )
468        .unwrap();
469        assert_eq!(boolean_value, json!(false));
470
471        let int16_value = datum_to_json_object(
472            &Field {
473                data_type: DataType::Int16,
474                ..mock_field.clone()
475            },
476            Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
477            &config,
478        )
479        .unwrap();
480        assert_eq!(int16_value, json!(16));
481
482        let int64_value = datum_to_json_object(
483            &Field {
484                data_type: DataType::Int64,
485                ..mock_field.clone()
486            },
487            Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
488            &config,
489        )
490        .unwrap();
491        assert_eq!(
492            serde_json::to_string(&int64_value).unwrap(),
493            i64::MAX.to_string()
494        );
495
496        let serial_value = datum_to_json_object(
497            &Field {
498                data_type: DataType::Serial,
499                ..mock_field.clone()
500            },
501            Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
502            &config,
503        )
504        .unwrap();
505        assert_eq!(
506            serde_json::to_string(&serial_value).unwrap(),
507            format!("\"{:#018x}\"", i64::MAX)
508        );
509
510        // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
511        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
512        let tstz_value = datum_to_json_object(
513            &Field {
514                data_type: DataType::Timestamptz,
515                ..mock_field.clone()
516            },
517            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
518            &config,
519        )
520        .unwrap();
521        assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
522
523        let unix_wo_suffix_config = JsonEncoderConfig {
524            time_handling_mode: TimeHandlingMode::Milli,
525            date_handling_mode: DateHandlingMode::FromCe,
526            timestamp_handling_mode: TimestampHandlingMode::String,
527            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
528            custom_json_type: CustomJsonType::None,
529            jsonb_handling_mode: JsonbHandlingMode::String,
530        };
531
532        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
533        let tstz_value = datum_to_json_object(
534            &Field {
535                data_type: DataType::Timestamptz,
536                ..mock_field.clone()
537            },
538            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
539            &unix_wo_suffix_config,
540        )
541        .unwrap();
542        assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
543
544        let timestamp_milli_config = JsonEncoderConfig {
545            time_handling_mode: TimeHandlingMode::String,
546            date_handling_mode: DateHandlingMode::FromCe,
547            timestamp_handling_mode: TimestampHandlingMode::Milli,
548            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
549            custom_json_type: CustomJsonType::None,
550            jsonb_handling_mode: JsonbHandlingMode::String,
551        };
552        let ts_value = datum_to_json_object(
553            &Field {
554                data_type: DataType::Timestamp,
555                ..mock_field.clone()
556            },
557            Some(
558                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
559                    .as_scalar_ref_impl(),
560            ),
561            &timestamp_milli_config,
562        )
563        .unwrap();
564        assert_eq!(ts_value, json!(1000 * 1000));
565
566        let ts_value = datum_to_json_object(
567            &Field {
568                data_type: DataType::Timestamp,
569                ..mock_field.clone()
570            },
571            Some(
572                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
573                    .as_scalar_ref_impl(),
574            ),
575            &config,
576        )
577        .unwrap();
578        assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
579
580        // Represents the number of milliseconds past midnigh, org.apache.kafka.connect.data.Time
581        let time_value = datum_to_json_object(
582            &Field {
583                data_type: DataType::Time,
584                ..mock_field.clone()
585            },
586            Some(
587                ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
588                    .as_scalar_ref_impl(),
589            ),
590            &config,
591        )
592        .unwrap();
593        assert_eq!(time_value, json!(1000 * 1000));
594
595        let interval_value = datum_to_json_object(
596            &Field {
597                data_type: DataType::Interval,
598                ..mock_field.clone()
599            },
600            Some(
601                ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
602                    .as_scalar_ref_impl(),
603            ),
604            &config,
605        )
606        .unwrap();
607        assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
608
609        let mut map = HashMap::default();
610        map.insert("aaa".to_owned(), 5_u8);
611        let doris_config = JsonEncoderConfig {
612            time_handling_mode: TimeHandlingMode::String,
613            date_handling_mode: DateHandlingMode::String,
614            timestamp_handling_mode: TimestampHandlingMode::String,
615            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
616            custom_json_type: CustomJsonType::Doris(map),
617            jsonb_handling_mode: JsonbHandlingMode::String,
618        };
619        let decimal = datum_to_json_object(
620            &Field {
621                data_type: DataType::Decimal,
622                name: "aaa".to_owned(),
623            },
624            Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
625            &doris_config,
626        )
627        .unwrap();
628        assert_eq!(decimal, json!("1.11111"));
629
630        let date_value = datum_to_json_object(
631            &Field {
632                data_type: DataType::Date,
633                ..mock_field.clone()
634            },
635            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
636            &config,
637        )
638        .unwrap();
639        assert_eq!(date_value, json!(719163));
640
641        let from_epoch_config = JsonEncoderConfig {
642            time_handling_mode: TimeHandlingMode::String,
643            date_handling_mode: DateHandlingMode::FromEpoch,
644            timestamp_handling_mode: TimestampHandlingMode::String,
645            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
646            custom_json_type: CustomJsonType::None,
647            jsonb_handling_mode: JsonbHandlingMode::String,
648        };
649        let date_value = datum_to_json_object(
650            &Field {
651                data_type: DataType::Date,
652                ..mock_field.clone()
653            },
654            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
655            &from_epoch_config,
656        )
657        .unwrap();
658        assert_eq!(date_value, json!(0));
659
660        let doris_config = JsonEncoderConfig {
661            time_handling_mode: TimeHandlingMode::String,
662            date_handling_mode: DateHandlingMode::String,
663            timestamp_handling_mode: TimestampHandlingMode::String,
664            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
665            custom_json_type: CustomJsonType::Doris(HashMap::default()),
666            jsonb_handling_mode: JsonbHandlingMode::String,
667        };
668        let date_value = datum_to_json_object(
669            &Field {
670                data_type: DataType::Date,
671                ..mock_field.clone()
672            },
673            Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
674            &doris_config,
675        )
676        .unwrap();
677        assert_eq!(date_value, json!("2010-10-10"));
678
679        let value = StructValue::new(vec![
680            Some(3_i32.to_scalar_value()),
681            Some(2_i32.to_scalar_value()),
682            Some(1_i32.to_scalar_value()),
683        ]);
684
685        let interval_value = datum_to_json_object(
686            &Field {
687                data_type: DataType::Struct(StructType::new(vec![
688                    ("v3", DataType::Int32),
689                    ("v2", DataType::Int32),
690                    ("v1", DataType::Int32),
691                ])),
692                ..mock_field.clone()
693            },
694            Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
695            &doris_config,
696        )
697        .unwrap();
698        assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
699
700        let encode_jsonb_obj_config = JsonEncoderConfig {
701            time_handling_mode: TimeHandlingMode::String,
702            date_handling_mode: DateHandlingMode::String,
703            timestamp_handling_mode: TimestampHandlingMode::String,
704            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
705            custom_json_type: CustomJsonType::None,
706            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
707        };
708        let json_value = datum_to_json_object(
709            &Field {
710                data_type: DataType::Jsonb,
711                ..mock_field.clone()
712            },
713            Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
714            &encode_jsonb_obj_config,
715        )
716        .unwrap();
717        assert_eq!(json_value, json!([1, 2, 3]));
718    }
719
720    #[test]
721    fn test_generate_json_converter_schema() {
722        let fields = vec![
723            Field {
724                data_type: DataType::Boolean,
725                name: "v1".into(),
726            },
727            Field {
728                data_type: DataType::Int16,
729                name: "v2".into(),
730            },
731            Field {
732                data_type: DataType::Int32,
733                name: "v3".into(),
734            },
735            Field {
736                data_type: DataType::Float32,
737                name: "v4".into(),
738            },
739            Field {
740                data_type: DataType::Decimal,
741                name: "v5".into(),
742            },
743            Field {
744                data_type: DataType::Date,
745                name: "v6".into(),
746            },
747            Field {
748                data_type: DataType::Varchar,
749                name: "v7".into(),
750            },
751            Field {
752                data_type: DataType::Time,
753                name: "v8".into(),
754            },
755            Field {
756                data_type: DataType::Interval,
757                name: "v9".into(),
758            },
759            Field {
760                data_type: DataType::Struct(StructType::new(vec![
761                    ("a", DataType::Timestamp),
762                    ("b", DataType::Timestamptz),
763                    (
764                        "c",
765                        DataType::Struct(StructType::new(vec![
766                            ("aa", DataType::Int64),
767                            ("bb", DataType::Float64),
768                        ])),
769                    ),
770                ])),
771                name: "v10".into(),
772            },
773            Field {
774                data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
775                    StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
776                ))))),
777                name: "v11".into(),
778            },
779            Field {
780                data_type: DataType::Jsonb,
781                name: "12".into(),
782            },
783            Field {
784                data_type: DataType::Serial,
785                name: "13".into(),
786            },
787            Field {
788                data_type: DataType::Int256,
789                name: "14".into(),
790            },
791        ];
792        let schema =
793            json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
794                .to_string();
795        let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
796        assert_eq!(schema, ans);
797    }
798}