risingwave_connector/parser/canal/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use risingwave_common::bail;
18use simd_json::BorrowedValue;
19use simd_json::prelude::{MutableObject, ValueAsScalar, ValueObjectAccess};
20
21use crate::error::ConnectorResult;
22use crate::only_parse_payload;
23use crate::parser::canal::operators::*;
24use crate::parser::unified::ChangeEventOperation;
25use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
26use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
27use crate::parser::{
28    ByteStreamSourceParser, JsonProperties, ParserFormat, SourceStreamChunkRowWriter,
29};
30use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
31
32const DATA: &str = "data";
33const OP: &str = "type";
34const IS_DDL: &str = "isDdl";
35
36#[derive(Debug)]
37pub struct CanalJsonParser {
38    pub(crate) rw_columns: Vec<SourceColumnDesc>,
39    source_ctx: SourceContextRef,
40    payload_start_idx: usize,
41}
42
43impl CanalJsonParser {
44    pub fn new(
45        rw_columns: Vec<SourceColumnDesc>,
46        source_ctx: SourceContextRef,
47        config: &JsonProperties,
48    ) -> ConnectorResult<Self> {
49        Ok(Self {
50            rw_columns,
51            source_ctx,
52            payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
53        })
54    }
55
56    #[allow(clippy::unused_async)]
57    pub async fn parse_inner(
58        &self,
59        mut payload: Vec<u8>,
60        mut writer: SourceStreamChunkRowWriter<'_>,
61    ) -> ConnectorResult<()> {
62        let mut event: BorrowedValue<'_> =
63            simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
64                .context("failed to parse canal json payload")?;
65
66        let is_ddl = event
67            .get(IS_DDL)
68            .and_then(|v| v.as_bool())
69            .context("field `isDdl` not found in canal json")?;
70        if is_ddl {
71            bail!("received a DDL message, please set `canal.instance.filter.query.dml` to true.");
72        }
73
74        let op = match event.get(OP).and_then(|v| v.as_str()) {
75            Some(CANAL_INSERT_EVENT | CANAL_UPDATE_EVENT) => ChangeEventOperation::Upsert,
76            Some(CANAL_DELETE_EVENT) => ChangeEventOperation::Delete,
77            _ => bail!("op field not found in canal json"),
78        };
79
80        let events = event
81            .get_mut(DATA)
82            .and_then(|v| match v {
83                BorrowedValue::Array(array) => Some(array),
84                _ => None,
85            })
86            .context("field `data` is missing for creating event")?;
87
88        let mut errors = Vec::new();
89        for event in events.drain(..) {
90            let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::CANAL);
91            match apply_row_operation_on_stream_chunk_writer((op, accessor), &mut writer) {
92                Ok(_) => {}
93                Err(err) => errors.push(err),
94            }
95        }
96
97        if errors.is_empty() {
98            Ok(())
99        } else {
100            // TODO(error-handling): multiple errors
101            bail!(
102                "failed to parse {} row(s) in a single canal json message: {}",
103                errors.len(),
104                errors.iter().format(", ")
105            );
106        }
107    }
108}
109
110impl ByteStreamSourceParser for CanalJsonParser {
111    fn columns(&self) -> &[SourceColumnDesc] {
112        &self.rw_columns
113    }
114
115    fn source_ctx(&self) -> &SourceContext {
116        &self.source_ctx
117    }
118
119    fn parser_format(&self) -> ParserFormat {
120        ParserFormat::CanalJson
121    }
122
123    async fn parse_one<'a>(
124        &'a mut self,
125        _key: Option<Vec<u8>>,
126        payload: Option<Vec<u8>>,
127        writer: SourceStreamChunkRowWriter<'a>,
128    ) -> ConnectorResult<()> {
129        only_parse_payload!(self, payload, writer)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use std::str::FromStr;
136
137    use risingwave_common::array::Op;
138    use risingwave_common::row::Row;
139    use risingwave_common::types::{DataType, Decimal, JsonbVal, ScalarImpl, ToOwnedDatum};
140    use serde_json::Value;
141
142    use super::*;
143    use crate::parser::SourceStreamChunkBuilder;
144    use crate::source::SourceCtrlOpts;
145
146    #[tokio::test]
147    async fn test_data_types() {
148        let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7,"json":12},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp","json":"json"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1","json":"{\"a\": 1, \"b\": 2}"}]}"#;
149        let descs = vec![
150            SourceColumnDesc::simple("id", DataType::Int32, 0.into()),
151            SourceColumnDesc::simple("date", DataType::Date, 1.into()),
152            SourceColumnDesc::simple("datetime", DataType::Timestamp, 2.into()),
153            SourceColumnDesc::simple("time", DataType::Time, 3.into()),
154            SourceColumnDesc::simple("timestamp", DataType::Timestamp, 4.into()),
155            SourceColumnDesc::simple("char", DataType::Varchar, 5.into()),
156            SourceColumnDesc::simple("binary", DataType::Bytea, 6.into()),
157            SourceColumnDesc::simple("json", DataType::Jsonb, 7.into()),
158        ];
159        let parser = CanalJsonParser::new(
160            descs.clone(),
161            SourceContext::dummy().into(),
162            &JsonProperties::default(),
163        )
164        .unwrap();
165
166        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
167
168        parser
169            .parse_inner(payload.to_vec(), builder.row_writer())
170            .await
171            .unwrap();
172
173        builder.finish_current_chunk();
174        let chunk = builder.consume_ready_chunks().next().unwrap();
175        let (op, row) = chunk.rows().next().unwrap();
176        assert_eq!(op, Op::Insert);
177        assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
178        assert_eq!(
179            row.datum_at(1).to_owned_datum(),
180            Some(ScalarImpl::Date(
181                chrono::NaiveDate::from_ymd_opt(2023, 4, 20).unwrap().into()
182            ))
183        );
184        assert_eq!(
185            row.datum_at(2).to_owned_datum(),
186            Some(ScalarImpl::Timestamp(
187                "2023-02-15 13:01:36".parse().unwrap()
188            ))
189        );
190        assert_eq!(
191            row.datum_at(3).to_owned_datum(),
192            Some(ScalarImpl::Time(
193                chrono::NaiveTime::from_hms_opt(20, 23, 41).unwrap().into()
194            ))
195        );
196        assert_eq!(
197            row.datum_at(4).to_owned_datum(),
198            Some(ScalarImpl::Timestamp(
199                "2022-10-13 12:12:54".parse().unwrap()
200            ))
201        );
202        assert_eq!(
203            row.datum_at(5).to_owned_datum(),
204            Some(ScalarImpl::Utf8(Box::from("Kathleen".to_owned())))
205        );
206        assert_eq!(
207            row.datum_at(6).to_owned_datum(),
208            Some(ScalarImpl::Bytea(Box::from(
209                "Joseph\u{0}\u{0}\u{0}\u{0}".as_bytes()
210            )))
211        );
212        assert_eq!(
213            row.datum_at(7).to_owned_datum(),
214            Some(ScalarImpl::Jsonb(JsonbVal::from(Value::from(
215                "{\"a\": 1, \"b\": 2}".to_owned()
216            ))))
217        );
218    }
219
220    #[tokio::test]
221    async fn test_json_parser() {
222        let payload = br#"{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1500.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673476000,"id":7,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":[{"balance":"1000.62"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673476732,"type":"UPDATE"}"#;
223
224        let descs = vec![
225            SourceColumnDesc::simple("ID", DataType::Int64, 0.into()),
226            SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
227            SourceColumnDesc::simple("is_adult", DataType::Boolean, 2.into()),
228            SourceColumnDesc::simple("balance", DataType::Decimal, 3.into()),
229            SourceColumnDesc::simple("reg_time", DataType::Timestamp, 4.into()),
230            SourceColumnDesc::simple("win_rate", DataType::Float64, 5.into()),
231        ];
232
233        let parser = CanalJsonParser::new(
234            descs.clone(),
235            SourceContext::dummy().into(),
236            &JsonProperties::default(),
237        )
238        .unwrap();
239
240        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
241
242        parser
243            .parse_inner(payload.to_vec(), builder.row_writer())
244            .await
245            .unwrap();
246
247        builder.finish_current_chunk();
248        let chunk = builder.consume_ready_chunks().next().unwrap();
249
250        let mut rows = chunk.rows();
251
252        {
253            let (op, row) = rows.next().unwrap();
254            assert_eq!(op, Op::Insert);
255            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int64(1)));
256            assert_eq!(
257                row.datum_at(1).to_owned_datum(),
258                (Some(ScalarImpl::Utf8("mike".into())))
259            );
260            assert_eq!(
261                row.datum_at(2).to_owned_datum(),
262                (Some(ScalarImpl::Bool(false)))
263            );
264            assert_eq!(
265                row.datum_at(3).to_owned_datum(),
266                (Some(Decimal::from_str("1500.62").unwrap().into()))
267            );
268            assert_eq!(
269                row.datum_at(4).to_owned_datum(),
270                (Some(ScalarImpl::Timestamp(
271                    "2018-01-01 00:00:01".parse().unwrap()
272                )))
273            );
274            assert_eq!(
275                row.datum_at(5).to_owned_datum(),
276                (Some(ScalarImpl::Float64(0.65.into())))
277            );
278        }
279    }
280
281    #[tokio::test]
282    async fn test_parse_multi_rows() {
283        let payload = br#"{"data": [{"v1": "1", "v2": "2"}, {"v1": "3", "v2": "4"}], "old": null, "mysqlType":{"v1": "int", "v2": "int"}, "sqlType":{"v1": 4, "v2": 4}, "database":"demo","es":1668673394000,"id":5,"isDdl":false, "table":"demo","ts":1668673394788,"type":"INSERT"}"#;
284
285        let descs = vec![
286            SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
287            SourceColumnDesc::simple("v2", DataType::Int32, 1.into()),
288        ];
289
290        let parser = CanalJsonParser::new(
291            descs.clone(),
292            SourceContext::dummy().into(),
293            &JsonProperties::default(),
294        )
295        .unwrap();
296
297        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
298
299        parser
300            .parse_inner(payload.to_vec(), builder.row_writer())
301            .await
302            .unwrap();
303
304        builder.finish_current_chunk();
305        let chunk = builder.consume_ready_chunks().next().unwrap();
306
307        let mut rows = chunk.rows();
308
309        {
310            let (op, row) = rows.next().unwrap();
311            assert_eq!(op, Op::Insert);
312            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
313            assert_eq!(row.datum_at(1).to_owned_datum(), Some(ScalarImpl::Int32(2)));
314        }
315
316        {
317            let (op, row) = rows.next().unwrap();
318            assert_eq!(op, Op::Insert);
319            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(3)));
320            assert_eq!(row.datum_at(1).to_owned_datum(), Some(ScalarImpl::Int32(4)));
321        }
322    }
323}