1use anyhow::Context;
16use itertools::Itertools;
17use risingwave_common::bail;
18use simd_json::BorrowedValue;
19use simd_json::prelude::{MutableObject, ValueAsScalar, ValueObjectAccess};
20
21use crate::error::ConnectorResult;
22use crate::only_parse_payload;
23use crate::parser::canal::operators::*;
24use crate::parser::unified::ChangeEventOperation;
25use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
26use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
27use crate::parser::{
28 ByteStreamSourceParser, JsonProperties, ParserFormat, SourceStreamChunkRowWriter,
29};
30use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
31
32const DATA: &str = "data";
33const OP: &str = "type";
34const IS_DDL: &str = "isDdl";
35
36#[derive(Debug)]
37pub struct CanalJsonParser {
38 pub(crate) rw_columns: Vec<SourceColumnDesc>,
39 source_ctx: SourceContextRef,
40 payload_start_idx: usize,
41}
42
43impl CanalJsonParser {
44 pub fn new(
45 rw_columns: Vec<SourceColumnDesc>,
46 source_ctx: SourceContextRef,
47 config: &JsonProperties,
48 ) -> ConnectorResult<Self> {
49 Ok(Self {
50 rw_columns,
51 source_ctx,
52 payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
53 })
54 }
55
56 #[allow(clippy::unused_async)]
57 pub async fn parse_inner(
58 &self,
59 mut payload: Vec<u8>,
60 mut writer: SourceStreamChunkRowWriter<'_>,
61 ) -> ConnectorResult<()> {
62 let mut event: BorrowedValue<'_> =
63 simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
64 .context("failed to parse canal json payload")?;
65
66 let is_ddl = event
67 .get(IS_DDL)
68 .and_then(|v| v.as_bool())
69 .context("field `isDdl` not found in canal json")?;
70 if is_ddl {
71 bail!("received a DDL message, please set `canal.instance.filter.query.dml` to true.");
72 }
73
74 let op = match event.get(OP).and_then(|v| v.as_str()) {
75 Some(CANAL_INSERT_EVENT | CANAL_UPDATE_EVENT) => ChangeEventOperation::Upsert,
76 Some(CANAL_DELETE_EVENT) => ChangeEventOperation::Delete,
77 _ => bail!("op field not found in canal json"),
78 };
79
80 let events = event
81 .get_mut(DATA)
82 .and_then(|v| match v {
83 BorrowedValue::Array(array) => Some(array),
84 _ => None,
85 })
86 .context("field `data` is missing for creating event")?;
87
88 let mut errors = Vec::new();
89 for event in events.drain(..) {
90 let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::CANAL);
91 match apply_row_operation_on_stream_chunk_writer((op, accessor), &mut writer) {
92 Ok(_) => {}
93 Err(err) => errors.push(err),
94 }
95 }
96
97 if errors.is_empty() {
98 Ok(())
99 } else {
100 bail!(
102 "failed to parse {} row(s) in a single canal json message: {}",
103 errors.len(),
104 errors.iter().format(", ")
105 );
106 }
107 }
108}
109
110impl ByteStreamSourceParser for CanalJsonParser {
111 fn columns(&self) -> &[SourceColumnDesc] {
112 &self.rw_columns
113 }
114
115 fn source_ctx(&self) -> &SourceContext {
116 &self.source_ctx
117 }
118
119 fn parser_format(&self) -> ParserFormat {
120 ParserFormat::CanalJson
121 }
122
123 async fn parse_one<'a>(
124 &'a mut self,
125 _key: Option<Vec<u8>>,
126 payload: Option<Vec<u8>>,
127 writer: SourceStreamChunkRowWriter<'a>,
128 ) -> ConnectorResult<()> {
129 only_parse_payload!(self, payload, writer)
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::str::FromStr;
136
137 use risingwave_common::array::Op;
138 use risingwave_common::row::Row;
139 use risingwave_common::types::{DataType, Decimal, JsonbVal, ScalarImpl, ToOwnedDatum};
140 use serde_json::Value;
141
142 use super::*;
143 use crate::parser::SourceStreamChunkBuilder;
144 use crate::source::SourceCtrlOpts;
145
146 #[tokio::test]
147 async fn test_data_types() {
148 let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7,"json":12},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp","json":"json"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1","json":"{\"a\": 1, \"b\": 2}"}]}"#;
149 let descs = vec![
150 SourceColumnDesc::simple("id", DataType::Int32, 0.into()),
151 SourceColumnDesc::simple("date", DataType::Date, 1.into()),
152 SourceColumnDesc::simple("datetime", DataType::Timestamp, 2.into()),
153 SourceColumnDesc::simple("time", DataType::Time, 3.into()),
154 SourceColumnDesc::simple("timestamp", DataType::Timestamp, 4.into()),
155 SourceColumnDesc::simple("char", DataType::Varchar, 5.into()),
156 SourceColumnDesc::simple("binary", DataType::Bytea, 6.into()),
157 SourceColumnDesc::simple("json", DataType::Jsonb, 7.into()),
158 ];
159 let parser = CanalJsonParser::new(
160 descs.clone(),
161 SourceContext::dummy().into(),
162 &JsonProperties::default(),
163 )
164 .unwrap();
165
166 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
167
168 parser
169 .parse_inner(payload.to_vec(), builder.row_writer())
170 .await
171 .unwrap();
172
173 builder.finish_current_chunk();
174 let chunk = builder.consume_ready_chunks().next().unwrap();
175 let (op, row) = chunk.rows().next().unwrap();
176 assert_eq!(op, Op::Insert);
177 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
178 assert_eq!(
179 row.datum_at(1).to_owned_datum(),
180 Some(ScalarImpl::Date(
181 chrono::NaiveDate::from_ymd_opt(2023, 4, 20).unwrap().into()
182 ))
183 );
184 assert_eq!(
185 row.datum_at(2).to_owned_datum(),
186 Some(ScalarImpl::Timestamp(
187 "2023-02-15 13:01:36".parse().unwrap()
188 ))
189 );
190 assert_eq!(
191 row.datum_at(3).to_owned_datum(),
192 Some(ScalarImpl::Time(
193 chrono::NaiveTime::from_hms_opt(20, 23, 41).unwrap().into()
194 ))
195 );
196 assert_eq!(
197 row.datum_at(4).to_owned_datum(),
198 Some(ScalarImpl::Timestamp(
199 "2022-10-13 12:12:54".parse().unwrap()
200 ))
201 );
202 assert_eq!(
203 row.datum_at(5).to_owned_datum(),
204 Some(ScalarImpl::Utf8(Box::from("Kathleen".to_owned())))
205 );
206 assert_eq!(
207 row.datum_at(6).to_owned_datum(),
208 Some(ScalarImpl::Bytea(Box::from(
209 "Joseph\u{0}\u{0}\u{0}\u{0}".as_bytes()
210 )))
211 );
212 assert_eq!(
213 row.datum_at(7).to_owned_datum(),
214 Some(ScalarImpl::Jsonb(JsonbVal::from(Value::from(
215 "{\"a\": 1, \"b\": 2}".to_owned()
216 ))))
217 );
218 }
219
220 #[tokio::test]
221 async fn test_json_parser() {
222 let payload = br#"{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1500.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673476000,"id":7,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":[{"balance":"1000.62"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673476732,"type":"UPDATE"}"#;
223
224 let descs = vec![
225 SourceColumnDesc::simple("ID", DataType::Int64, 0.into()),
226 SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
227 SourceColumnDesc::simple("is_adult", DataType::Boolean, 2.into()),
228 SourceColumnDesc::simple("balance", DataType::Decimal, 3.into()),
229 SourceColumnDesc::simple("reg_time", DataType::Timestamp, 4.into()),
230 SourceColumnDesc::simple("win_rate", DataType::Float64, 5.into()),
231 ];
232
233 let parser = CanalJsonParser::new(
234 descs.clone(),
235 SourceContext::dummy().into(),
236 &JsonProperties::default(),
237 )
238 .unwrap();
239
240 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
241
242 parser
243 .parse_inner(payload.to_vec(), builder.row_writer())
244 .await
245 .unwrap();
246
247 builder.finish_current_chunk();
248 let chunk = builder.consume_ready_chunks().next().unwrap();
249
250 let mut rows = chunk.rows();
251
252 {
253 let (op, row) = rows.next().unwrap();
254 assert_eq!(op, Op::Insert);
255 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int64(1)));
256 assert_eq!(
257 row.datum_at(1).to_owned_datum(),
258 (Some(ScalarImpl::Utf8("mike".into())))
259 );
260 assert_eq!(
261 row.datum_at(2).to_owned_datum(),
262 (Some(ScalarImpl::Bool(false)))
263 );
264 assert_eq!(
265 row.datum_at(3).to_owned_datum(),
266 (Some(Decimal::from_str("1500.62").unwrap().into()))
267 );
268 assert_eq!(
269 row.datum_at(4).to_owned_datum(),
270 (Some(ScalarImpl::Timestamp(
271 "2018-01-01 00:00:01".parse().unwrap()
272 )))
273 );
274 assert_eq!(
275 row.datum_at(5).to_owned_datum(),
276 (Some(ScalarImpl::Float64(0.65.into())))
277 );
278 }
279 }
280
281 #[tokio::test]
282 async fn test_parse_multi_rows() {
283 let payload = br#"{"data": [{"v1": "1", "v2": "2"}, {"v1": "3", "v2": "4"}], "old": null, "mysqlType":{"v1": "int", "v2": "int"}, "sqlType":{"v1": 4, "v2": 4}, "database":"demo","es":1668673394000,"id":5,"isDdl":false, "table":"demo","ts":1668673394788,"type":"INSERT"}"#;
284
285 let descs = vec![
286 SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
287 SourceColumnDesc::simple("v2", DataType::Int32, 1.into()),
288 ];
289
290 let parser = CanalJsonParser::new(
291 descs.clone(),
292 SourceContext::dummy().into(),
293 &JsonProperties::default(),
294 )
295 .unwrap();
296
297 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
298
299 parser
300 .parse_inner(payload.to_vec(), builder.row_writer())
301 .await
302 .unwrap();
303
304 builder.finish_current_chunk();
305 let chunk = builder.consume_ready_chunks().next().unwrap();
306
307 let mut rows = chunk.rows();
308
309 {
310 let (op, row) = rows.next().unwrap();
311 assert_eq!(op, Op::Insert);
312 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
313 assert_eq!(row.datum_at(1).to_owned_datum(), Some(ScalarImpl::Int32(2)));
314 }
315
316 {
317 let (op, row) = rows.next().unwrap();
318 assert_eq!(op, Op::Insert);
319 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(3)));
320 assert_eq!(row.datum_at(1).to_owned_datum(), Some(ScalarImpl::Int32(4)));
321 }
322 }
323}