risingwave_connector/sink/encoder/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use anyhow::Context;
19use base64::Engine as _;
20use base64::engine::general_purpose;
21use chrono::{DateTime, Datelike, Timelike};
22use indexmap::IndexMap;
23use itertools::Itertools;
24use risingwave_common::array::{ArrayError, ArrayResult};
25use risingwave_common::catalog::{Field, Schema};
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, DatumRef, JsonbVal, ScalarRefImpl, ToText};
28use risingwave_common::util::iter_util::ZipEqDebug;
29use serde_json::{Map, Value, json};
30use thiserror_ext::AsReport;
31
32use super::{
33    CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
34    Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
35};
36use crate::sink::SinkError;
37
38pub struct JsonEncoderConfig {
39    time_handling_mode: TimeHandlingMode,
40    date_handling_mode: DateHandlingMode,
41    timestamp_handling_mode: TimestampHandlingMode,
42    timestamptz_handling_mode: TimestamptzHandlingMode,
43    custom_json_type: CustomJsonType,
44    jsonb_handling_mode: JsonbHandlingMode,
45}
46
47pub struct JsonEncoder {
48    schema: Schema,
49    col_indices: Option<Vec<usize>>,
50    kafka_connect: Option<KafkaConnectParamsRef>,
51    config: JsonEncoderConfig,
52}
53
54impl JsonEncoder {
55    pub fn new(
56        schema: Schema,
57        col_indices: Option<Vec<usize>>,
58        date_handling_mode: DateHandlingMode,
59        timestamp_handling_mode: TimestampHandlingMode,
60        timestamptz_handling_mode: TimestamptzHandlingMode,
61        time_handling_mode: TimeHandlingMode,
62        jsonb_handling_mode: JsonbHandlingMode,
63    ) -> Self {
64        let config = JsonEncoderConfig {
65            time_handling_mode,
66            date_handling_mode,
67            timestamp_handling_mode,
68            timestamptz_handling_mode,
69            custom_json_type: CustomJsonType::None,
70            jsonb_handling_mode,
71        };
72        Self {
73            schema,
74            col_indices,
75            kafka_connect: None,
76            config,
77        }
78    }
79
80    pub fn new_with_es(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
81        let config = JsonEncoderConfig {
82            time_handling_mode: TimeHandlingMode::String,
83            date_handling_mode: DateHandlingMode::String,
84            timestamp_handling_mode: TimestampHandlingMode::String,
85            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
86            custom_json_type: CustomJsonType::Es,
87            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
88        };
89        Self {
90            schema,
91            col_indices,
92            kafka_connect: None,
93            config,
94        }
95    }
96
97    pub fn new_with_doris(
98        schema: Schema,
99        col_indices: Option<Vec<usize>>,
100        map: HashMap<String, u8>,
101    ) -> Self {
102        let config = JsonEncoderConfig {
103            time_handling_mode: TimeHandlingMode::Milli,
104            date_handling_mode: DateHandlingMode::String,
105            timestamp_handling_mode: TimestampHandlingMode::String,
106            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
107            custom_json_type: CustomJsonType::Doris(map),
108            jsonb_handling_mode: JsonbHandlingMode::String,
109        };
110        Self {
111            schema,
112            col_indices,
113            kafka_connect: None,
114            config,
115        }
116    }
117
118    pub fn new_with_starrocks(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
119        let config = JsonEncoderConfig {
120            time_handling_mode: TimeHandlingMode::Milli,
121            date_handling_mode: DateHandlingMode::String,
122            timestamp_handling_mode: TimestampHandlingMode::String,
123            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
124            custom_json_type: CustomJsonType::StarRocks,
125            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
126        };
127        Self {
128            schema,
129            col_indices,
130            kafka_connect: None,
131            config,
132        }
133    }
134
135    pub fn with_kafka_connect(self, kafka_connect: KafkaConnectParams) -> Self {
136        Self {
137            kafka_connect: Some(Arc::new(kafka_connect)),
138            ..self
139        }
140    }
141}
142
143impl RowEncoder for JsonEncoder {
144    type Output = Map<String, Value>;
145
146    fn schema(&self) -> &Schema {
147        &self.schema
148    }
149
150    fn col_indices(&self) -> Option<&[usize]> {
151        self.col_indices.as_ref().map(Vec::as_ref)
152    }
153
154    fn encode_cols(
155        &self,
156        row: impl Row,
157        col_indices: impl Iterator<Item = usize>,
158    ) -> Result<Self::Output> {
159        let mut mappings = Map::with_capacity(self.schema.len());
160        let col_indices = col_indices.collect_vec();
161        for idx in &col_indices {
162            let field = &self.schema[*idx];
163            let key = field.name.clone();
164            let value = datum_to_json_object(field, row.datum_at(*idx), &self.config)
165                .map_err(|e| SinkError::Encode(e.to_report_string()))?;
166            mappings.insert(key, value);
167        }
168
169        Ok(if let Some(param) = &self.kafka_connect {
170            json_converter_with_schema(
171                Value::Object(mappings),
172                param.schema_name.to_owned(),
173                col_indices.into_iter().map(|i| &self.schema[i]),
174            )
175        } else {
176            mappings
177        })
178    }
179}
180
181impl SerTo<String> for Map<String, Value> {
182    fn ser_to(self) -> Result<String> {
183        Value::Object(self).ser_to()
184    }
185}
186
187impl SerTo<String> for Value {
188    fn ser_to(self) -> Result<String> {
189        Ok(self.to_string())
190    }
191}
192
193fn datum_to_json_object(
194    field: &Field,
195    datum: DatumRef<'_>,
196    config: &JsonEncoderConfig,
197) -> ArrayResult<Value> {
198    let scalar_ref = match datum {
199        None => {
200            return Ok(Value::Null);
201        }
202        Some(datum) => datum,
203    };
204
205    let data_type = field.data_type();
206
207    tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
208
209    let value = match (data_type, scalar_ref) {
210        (DataType::Boolean, ScalarRefImpl::Bool(v)) => {
211            json!(v)
212        }
213        (DataType::Int16, ScalarRefImpl::Int16(v)) => {
214            json!(v)
215        }
216        (DataType::Int32, ScalarRefImpl::Int32(v)) => {
217            json!(v)
218        }
219        (DataType::Int64, ScalarRefImpl::Int64(v)) => {
220            json!(v)
221        }
222        (DataType::Serial, ScalarRefImpl::Serial(v)) => {
223            // The serial type needs to be handled as a string to prevent primary key conflicts caused by the precision issues of JSON numbers.
224            json!(format!("{:#018x}", v.into_inner()))
225        }
226        (DataType::Float32, ScalarRefImpl::Float32(v)) => {
227            json!(f32::from(v))
228        }
229        (DataType::Float64, ScalarRefImpl::Float64(v)) => {
230            json!(f64::from(v))
231        }
232        (DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
233            json!(v)
234        }
235        // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL
236        (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type {
237            CustomJsonType::Doris(map) => {
238                let s = map.get(&field.name).unwrap();
239                v.rescale(*s as u32);
240                json!(v.to_text())
241            }
242            CustomJsonType::Es | CustomJsonType::None | CustomJsonType::StarRocks => {
243                json!(v.to_text())
244            }
245        },
246        (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => {
247            match config.timestamptz_handling_mode {
248                TimestamptzHandlingMode::UtcString => {
249                    let parsed = v.to_datetime_utc();
250                    let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
251                    json!(v)
252                }
253                TimestamptzHandlingMode::UtcWithoutSuffix => {
254                    let parsed = v.to_datetime_utc().naive_utc();
255                    let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string();
256                    json!(v)
257                }
258                TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()),
259                TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()),
260            }
261        }
262        (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode {
263            TimeHandlingMode::Milli => {
264                // todo: just ignore the nanos part to avoid leap second complex
265                json!(v.0.num_seconds_from_midnight() as i64 * 1000)
266            }
267            TimeHandlingMode::String => {
268                let a = v.0.format("%H:%M:%S%.6f").to_string();
269                json!(a)
270            }
271        },
272        (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode {
273            DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()),
274            DateHandlingMode::FromEpoch => {
275                let duration = v.0 - DateTime::UNIX_EPOCH.date_naive();
276                json!(duration.num_days())
277            }
278            DateHandlingMode::String => {
279                let a = v.0.format("%Y-%m-%d").to_string();
280                json!(a)
281            }
282        },
283        (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => {
284            match config.timestamp_handling_mode {
285                TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()),
286                TimestampHandlingMode::String => {
287                    json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
288                }
289            }
290        }
291        (DataType::Bytea, ScalarRefImpl::Bytea(v)) => {
292            json!(general_purpose::STANDARD.encode(v))
293        }
294        // P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S
295        (DataType::Interval, ScalarRefImpl::Interval(v)) => {
296            json!(v.as_iso_8601())
297        }
298
299        (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
300            JsonbHandlingMode::String => {
301                json!(jsonb_ref.to_string())
302            }
303            JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
304        },
305        (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
306            let elems = list_ref.iter();
307            let mut vec = Vec::with_capacity(elems.len());
308            let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
309            for sub_datum_ref in elems {
310                let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?;
311                vec.push(value);
312            }
313            json!(vec)
314        }
315        (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
316            match config.custom_json_type {
317                CustomJsonType::Doris(_) => {
318                    // We need to ensure that the order of elements in the json matches the insertion order.
319                    let mut map = IndexMap::with_capacity(st.len());
320                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
321                        st.iter()
322                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
323                    ) {
324                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
325                        map.insert(sub_field.name.clone(), value);
326                    }
327                    Value::String(
328                        serde_json::to_string(&map).context("failed to serialize into JSON")?,
329                    )
330                }
331                CustomJsonType::StarRocks => {
332                    return Err(ArrayError::internal(
333                        "starrocks can't support struct".to_owned(),
334                    ));
335                }
336                CustomJsonType::Es | CustomJsonType::None => {
337                    let mut map = Map::with_capacity(st.len());
338                    for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug(
339                        st.iter()
340                            .map(|(name, dt)| Field::with_name(dt.clone(), name)),
341                    ) {
342                        let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?;
343                        map.insert(sub_field.name.clone(), value);
344                    }
345                    json!(map)
346                }
347            }
348        }
349        // TODO(map): support map
350        (data_type, scalar_ref) => {
351            return Err(ArrayError::internal(format!(
352                "datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}",
353                field.name, data_type, scalar_ref
354            )));
355        }
356    };
357
358    Ok(value)
359}
360
361fn json_converter_with_schema<'a>(
362    object: Value,
363    name: String,
364    fields: impl Iterator<Item = &'a Field>,
365) -> Map<String, Value> {
366    let mut mapping = Map::with_capacity(2);
367    mapping.insert(
368        "schema".to_owned(),
369        json!({
370            "type": "struct",
371            "fields": fields.map(|field| {
372                let mut mapping = type_as_json_schema(&field.data_type);
373                mapping.insert("field".to_owned(), json!(field.name));
374                mapping
375            }).collect_vec(),
376            "optional": false,
377            "name": name,
378        }),
379    );
380    mapping.insert("payload".to_owned(), object);
381    mapping
382}
383
384// reference: https://github.com/apache/kafka/blob/80982c4ae3fe6be127b48ec09caff11ab5f87c69/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSchema.java#L39
385pub(crate) fn schema_type_mapping(rw_type: &DataType) -> &'static str {
386    match rw_type {
387        DataType::Boolean => "boolean",
388        DataType::Int16 => "int16",
389        DataType::Int32 => "int32",
390        DataType::Int64 => "int64",
391        DataType::Float32 => "float",
392        DataType::Float64 => "double",
393        DataType::Decimal => "string",
394        DataType::Date => "int32",
395        DataType::Varchar => "string",
396        DataType::Time => "int64",
397        DataType::Timestamp => "int64",
398        DataType::Timestamptz => "string",
399        DataType::Interval => "string",
400        DataType::Struct(_) => "struct",
401        DataType::List(_) => "array",
402        DataType::Bytea => "bytes",
403        DataType::Jsonb => "string",
404        DataType::Serial => "string",
405        DataType::Int256 => "string",
406        DataType::Map(_) => "map",
407        DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
408    }
409}
410
411fn type_as_json_schema(rw_type: &DataType) -> Map<String, Value> {
412    let mut mapping = Map::with_capacity(4); // type + optional + fields/items + field
413    mapping.insert("type".to_owned(), json!(schema_type_mapping(rw_type)));
414    mapping.insert("optional".to_owned(), json!(true));
415    match rw_type {
416        DataType::Struct(struct_type) => {
417            let sub_fields = struct_type
418                .iter()
419                .map(|(sub_name, sub_type)| {
420                    let mut sub_mapping = type_as_json_schema(sub_type);
421                    sub_mapping.insert("field".to_owned(), json!(sub_name));
422                    sub_mapping
423                })
424                .collect_vec();
425            mapping.insert("fields".to_owned(), json!(sub_fields));
426        }
427        DataType::List(sub_type) => {
428            mapping.insert("items".to_owned(), json!(type_as_json_schema(sub_type)));
429        }
430        _ => {}
431    }
432
433    mapping
434}
435
436#[cfg(test)]
437mod tests {
438    use risingwave_common::types::{
439        Date, Decimal, Interval, Scalar, ScalarImpl, StructRef, StructType, StructValue, Time,
440        Timestamp,
441    };
442
443    use super::*;
444
445    #[test]
446    fn test_to_json_basic_type() {
447        let mock_field = Field {
448            data_type: DataType::Boolean,
449            name: Default::default(),
450        };
451
452        let config = JsonEncoderConfig {
453            time_handling_mode: TimeHandlingMode::Milli,
454            date_handling_mode: DateHandlingMode::FromCe,
455            timestamp_handling_mode: TimestampHandlingMode::String,
456            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
457            custom_json_type: CustomJsonType::None,
458            jsonb_handling_mode: JsonbHandlingMode::String,
459        };
460
461        let boolean_value = datum_to_json_object(
462            &Field {
463                data_type: DataType::Boolean,
464                ..mock_field.clone()
465            },
466            Some(ScalarImpl::Bool(false).as_scalar_ref_impl()),
467            &config,
468        )
469        .unwrap();
470        assert_eq!(boolean_value, json!(false));
471
472        let int16_value = datum_to_json_object(
473            &Field {
474                data_type: DataType::Int16,
475                ..mock_field.clone()
476            },
477            Some(ScalarImpl::Int16(16).as_scalar_ref_impl()),
478            &config,
479        )
480        .unwrap();
481        assert_eq!(int16_value, json!(16));
482
483        let int64_value = datum_to_json_object(
484            &Field {
485                data_type: DataType::Int64,
486                ..mock_field.clone()
487            },
488            Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()),
489            &config,
490        )
491        .unwrap();
492        assert_eq!(
493            serde_json::to_string(&int64_value).unwrap(),
494            i64::MAX.to_string()
495        );
496
497        let serial_value = datum_to_json_object(
498            &Field {
499                data_type: DataType::Serial,
500                ..mock_field.clone()
501            },
502            Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()),
503            &config,
504        )
505        .unwrap();
506        assert_eq!(
507            serde_json::to_string(&serial_value).unwrap(),
508            format!("\"{:#018x}\"", i64::MAX)
509        );
510
511        // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
512        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
513        let tstz_value = datum_to_json_object(
514            &Field {
515                data_type: DataType::Timestamptz,
516                ..mock_field.clone()
517            },
518            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
519            &config,
520        )
521        .unwrap();
522        assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z");
523
524        let unix_wo_suffix_config = JsonEncoderConfig {
525            time_handling_mode: TimeHandlingMode::Milli,
526            date_handling_mode: DateHandlingMode::FromCe,
527            timestamp_handling_mode: TimestampHandlingMode::String,
528            timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
529            custom_json_type: CustomJsonType::None,
530            jsonb_handling_mode: JsonbHandlingMode::String,
531        };
532
533        let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
534        let tstz_value = datum_to_json_object(
535            &Field {
536                data_type: DataType::Timestamptz,
537                ..mock_field.clone()
538            },
539            Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()),
540            &unix_wo_suffix_config,
541        )
542        .unwrap();
543        assert_eq!(tstz_value, "2018-01-26 18:30:09.453000");
544
545        let timestamp_milli_config = JsonEncoderConfig {
546            time_handling_mode: TimeHandlingMode::String,
547            date_handling_mode: DateHandlingMode::FromCe,
548            timestamp_handling_mode: TimestampHandlingMode::Milli,
549            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
550            custom_json_type: CustomJsonType::None,
551            jsonb_handling_mode: JsonbHandlingMode::String,
552        };
553        let ts_value = datum_to_json_object(
554            &Field {
555                data_type: DataType::Timestamp,
556                ..mock_field.clone()
557            },
558            Some(
559                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
560                    .as_scalar_ref_impl(),
561            ),
562            &timestamp_milli_config,
563        )
564        .unwrap();
565        assert_eq!(ts_value, json!(1000 * 1000));
566
567        let ts_value = datum_to_json_object(
568            &Field {
569                data_type: DataType::Timestamp,
570                ..mock_field.clone()
571            },
572            Some(
573                ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0))
574                    .as_scalar_ref_impl(),
575            ),
576            &config,
577        )
578        .unwrap();
579        assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_owned()));
580
581        // Represents the number of milliseconds past midnigh, org.apache.kafka.connect.data.Time
582        let time_value = datum_to_json_object(
583            &Field {
584                data_type: DataType::Time,
585                ..mock_field.clone()
586            },
587            Some(
588                ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0))
589                    .as_scalar_ref_impl(),
590            ),
591            &config,
592        )
593        .unwrap();
594        assert_eq!(time_value, json!(1000 * 1000));
595
596        let interval_value = datum_to_json_object(
597            &Field {
598                data_type: DataType::Interval,
599                ..mock_field.clone()
600            },
601            Some(
602                ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000))
603                    .as_scalar_ref_impl(),
604            ),
605            &config,
606        )
607        .unwrap();
608        assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S"));
609
610        let mut map = HashMap::default();
611        map.insert("aaa".to_owned(), 5_u8);
612        let doris_config = JsonEncoderConfig {
613            time_handling_mode: TimeHandlingMode::String,
614            date_handling_mode: DateHandlingMode::String,
615            timestamp_handling_mode: TimestampHandlingMode::String,
616            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
617            custom_json_type: CustomJsonType::Doris(map),
618            jsonb_handling_mode: JsonbHandlingMode::String,
619        };
620        let decimal = datum_to_json_object(
621            &Field {
622                data_type: DataType::Decimal,
623                name: "aaa".to_owned(),
624            },
625            Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()),
626            &doris_config,
627        )
628        .unwrap();
629        assert_eq!(decimal, json!("1.11111"));
630
631        let date_value = datum_to_json_object(
632            &Field {
633                data_type: DataType::Date,
634                ..mock_field.clone()
635            },
636            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
637            &config,
638        )
639        .unwrap();
640        assert_eq!(date_value, json!(719163));
641
642        let from_epoch_config = JsonEncoderConfig {
643            time_handling_mode: TimeHandlingMode::String,
644            date_handling_mode: DateHandlingMode::FromEpoch,
645            timestamp_handling_mode: TimestampHandlingMode::String,
646            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
647            custom_json_type: CustomJsonType::None,
648            jsonb_handling_mode: JsonbHandlingMode::String,
649        };
650        let date_value = datum_to_json_object(
651            &Field {
652                data_type: DataType::Date,
653                ..mock_field.clone()
654            },
655            Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()),
656            &from_epoch_config,
657        )
658        .unwrap();
659        assert_eq!(date_value, json!(0));
660
661        let doris_config = JsonEncoderConfig {
662            time_handling_mode: TimeHandlingMode::String,
663            date_handling_mode: DateHandlingMode::String,
664            timestamp_handling_mode: TimestampHandlingMode::String,
665            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
666            custom_json_type: CustomJsonType::Doris(HashMap::default()),
667            jsonb_handling_mode: JsonbHandlingMode::String,
668        };
669        let date_value = datum_to_json_object(
670            &Field {
671                data_type: DataType::Date,
672                ..mock_field.clone()
673            },
674            Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()),
675            &doris_config,
676        )
677        .unwrap();
678        assert_eq!(date_value, json!("2010-10-10"));
679
680        let value = StructValue::new(vec![
681            Some(3_i32.to_scalar_value()),
682            Some(2_i32.to_scalar_value()),
683            Some(1_i32.to_scalar_value()),
684        ]);
685
686        let interval_value = datum_to_json_object(
687            &Field {
688                data_type: DataType::Struct(StructType::new(vec![
689                    ("v3", DataType::Int32),
690                    ("v2", DataType::Int32),
691                    ("v1", DataType::Int32),
692                ])),
693                ..mock_field.clone()
694            },
695            Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })),
696            &doris_config,
697        )
698        .unwrap();
699        assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));
700
701        let encode_jsonb_obj_config = JsonEncoderConfig {
702            time_handling_mode: TimeHandlingMode::String,
703            date_handling_mode: DateHandlingMode::String,
704            timestamp_handling_mode: TimestampHandlingMode::String,
705            timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
706            custom_json_type: CustomJsonType::None,
707            jsonb_handling_mode: JsonbHandlingMode::Dynamic,
708        };
709        let json_value = datum_to_json_object(
710            &Field {
711                data_type: DataType::Jsonb,
712                ..mock_field.clone()
713            },
714            Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
715            &encode_jsonb_obj_config,
716        )
717        .unwrap();
718        assert_eq!(json_value, json!([1, 2, 3]));
719    }
720
721    #[test]
722    fn test_generate_json_converter_schema() {
723        let fields = vec![
724            Field {
725                data_type: DataType::Boolean,
726                name: "v1".into(),
727            },
728            Field {
729                data_type: DataType::Int16,
730                name: "v2".into(),
731            },
732            Field {
733                data_type: DataType::Int32,
734                name: "v3".into(),
735            },
736            Field {
737                data_type: DataType::Float32,
738                name: "v4".into(),
739            },
740            Field {
741                data_type: DataType::Decimal,
742                name: "v5".into(),
743            },
744            Field {
745                data_type: DataType::Date,
746                name: "v6".into(),
747            },
748            Field {
749                data_type: DataType::Varchar,
750                name: "v7".into(),
751            },
752            Field {
753                data_type: DataType::Time,
754                name: "v8".into(),
755            },
756            Field {
757                data_type: DataType::Interval,
758                name: "v9".into(),
759            },
760            Field {
761                data_type: DataType::Struct(StructType::new(vec![
762                    ("a", DataType::Timestamp),
763                    ("b", DataType::Timestamptz),
764                    (
765                        "c",
766                        DataType::Struct(StructType::new(vec![
767                            ("aa", DataType::Int64),
768                            ("bb", DataType::Float64),
769                        ])),
770                    ),
771                ])),
772                name: "v10".into(),
773            },
774            Field {
775                data_type: DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
776                    StructType::new(vec![("aa", DataType::Int64), ("bb", DataType::Float64)]),
777                ))))),
778                name: "v11".into(),
779            },
780            Field {
781                data_type: DataType::Jsonb,
782                name: "12".into(),
783            },
784            Field {
785                data_type: DataType::Serial,
786                name: "13".into(),
787            },
788            Field {
789                data_type: DataType::Int256,
790                name: "14".into(),
791            },
792        ];
793        let schema =
794            json_converter_with_schema(json!({}), "test".to_owned(), fields.iter())["schema"]
795                .to_string();
796        let ans = r#"{"fields":[{"field":"v1","optional":true,"type":"boolean"},{"field":"v2","optional":true,"type":"int16"},{"field":"v3","optional":true,"type":"int32"},{"field":"v4","optional":true,"type":"float"},{"field":"v5","optional":true,"type":"string"},{"field":"v6","optional":true,"type":"int32"},{"field":"v7","optional":true,"type":"string"},{"field":"v8","optional":true,"type":"int64"},{"field":"v9","optional":true,"type":"string"},{"field":"v10","fields":[{"field":"a","optional":true,"type":"int64"},{"field":"b","optional":true,"type":"string"},{"field":"c","fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"}],"optional":true,"type":"struct"},{"field":"v11","items":{"items":{"fields":[{"field":"aa","optional":true,"type":"int64"},{"field":"bb","optional":true,"type":"double"}],"optional":true,"type":"struct"},"optional":true,"type":"array"},"optional":true,"type":"array"},{"field":"12","optional":true,"type":"string"},{"field":"13","optional":true,"type":"string"},{"field":"14","optional":true,"type":"string"}],"name":"test","optional":false,"type":"struct"}"#;
797        assert_eq!(schema, ans);
798    }
799}