1use std::time::{SystemTime, UNIX_EPOCH};
16
17use risingwave_common::array::Op;
18use risingwave_common::catalog::{Field, Schema};
19use serde_json::{Map, Value, json};
20use tracing::warn;
21
22use super::{Result, SinkFormatter, StreamChunk};
23use crate::sink::encoder::{
24 DateHandlingMode, JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode,
25 TimestampHandlingMode, TimestamptzHandlingMode,
26};
27use crate::tri;
28
29const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave";
30
31pub struct DebeziumAdapterOpts {
32 gen_tombstone: bool,
33}
34
35impl Default for DebeziumAdapterOpts {
36 fn default() -> Self {
37 Self {
38 gen_tombstone: true,
39 }
40 }
41}
42
43fn concat_debezium_name_field(db_name: &str, sink_from_name: &str, value: &str) -> String {
44 DEBEZIUM_NAME_FIELD_PREFIX.to_owned() + "." + db_name + "." + sink_from_name + "." + value
45}
46
47pub struct DebeziumJsonFormatter {
48 schema: Schema,
49 pk_indices: Vec<usize>,
50 db_name: String,
51 sink_from_name: String,
52 opts: DebeziumAdapterOpts,
53 key_encoder: JsonEncoder,
54 val_encoder: JsonEncoder,
55}
56
57impl DebeziumJsonFormatter {
58 pub fn new(
59 schema: Schema,
60 pk_indices: Vec<usize>,
61 db_name: String,
62 sink_from_name: String,
63 opts: DebeziumAdapterOpts,
64 ) -> Self {
65 let key_encoder = JsonEncoder::new(
66 schema.clone(),
67 Some(pk_indices.clone()),
68 DateHandlingMode::FromEpoch,
69 TimestampHandlingMode::Milli,
70 TimestamptzHandlingMode::UtcString,
71 TimeHandlingMode::Milli,
72 JsonbHandlingMode::String,
73 );
74 let val_encoder = JsonEncoder::new(
75 schema.clone(),
76 None,
77 DateHandlingMode::FromEpoch,
78 TimestampHandlingMode::Milli,
79 TimestamptzHandlingMode::UtcString,
80 TimeHandlingMode::Milli,
81 JsonbHandlingMode::String,
82 );
83 Self {
84 schema,
85 pk_indices,
86 db_name,
87 sink_from_name,
88 opts,
89 key_encoder,
90 val_encoder,
91 }
92 }
93}
94
95impl SinkFormatter for DebeziumJsonFormatter {
96 type K = Value;
97 type V = Value;
98
99 fn format_chunk(
100 &self,
101 chunk: &StreamChunk,
102 ) -> impl Iterator<Item = Result<(Option<Value>, Option<Value>)>> {
103 std::iter::from_coroutine(
104 #[coroutine]
105 || {
106 let DebeziumJsonFormatter {
107 schema,
108 pk_indices,
109 db_name,
110 sink_from_name,
111 opts,
112 key_encoder,
113 val_encoder,
114 } = self;
115 let ts_ms = SystemTime::now()
116 .duration_since(UNIX_EPOCH)
117 .unwrap()
118 .as_millis() as u64;
119 let source_field = json!({
120 "db": db_name,
123 "table": sink_from_name,
124 "ts_ms": ts_ms,
125 });
126
127 let mut update_cache: Option<Map<String, Value>> = None;
128
129 for (op, row) in chunk.rows() {
130 let event_key_object: Option<Value> = Some(json!({
131 "schema": json!({
132 "type": "struct",
133 "fields": fields_pk_to_json(&schema.fields, pk_indices),
134 "optional": false,
135 "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
136 }),
137 "payload": tri!(key_encoder.encode(row)),
138 }));
139 let event_object: Option<Value> = match op {
140 Op::Insert => Some(json!({
141 "schema": schema_to_json(schema, db_name, sink_from_name),
142 "payload": {
143 "before": null,
144 "after": tri!(val_encoder.encode(row)),
145 "op": "c",
146 "ts_ms": ts_ms,
147 "source": source_field,
148 }
149 })),
150 Op::Delete => {
151 let value_obj = Some(json!({
152 "schema": schema_to_json(schema, db_name, sink_from_name),
153 "payload": {
154 "before": tri!(val_encoder.encode(row)),
155 "after": null,
156 "op": "d",
157 "ts_ms": ts_ms,
158 "source": source_field,
159 }
160 }));
161 yield Ok((event_key_object.clone(), value_obj));
162
163 if opts.gen_tombstone {
164 yield Ok((event_key_object, None));
167 }
168
169 continue;
170 }
171 Op::UpdateDelete => {
172 update_cache = Some(tri!(val_encoder.encode(row)));
173 continue;
174 }
175 Op::UpdateInsert => {
176 if let Some(before) = update_cache.take() {
177 Some(json!({
178 "schema": schema_to_json(schema, db_name, sink_from_name),
179 "payload": {
180 "before": before,
181 "after": tri!(val_encoder.encode(row)),
182 "op": "u",
183 "ts_ms": ts_ms,
184 "source": source_field,
185 }
186 }))
187 } else {
188 warn!(
189 "not found UpdateDelete in prev row, skipping, row index {:?}",
190 row.index()
191 );
192 continue;
193 }
194 }
195 };
196 yield Ok((event_key_object, event_object));
197 }
198 },
199 )
200 }
201}
202
203pub(crate) fn schema_to_json(schema: &Schema, db_name: &str, sink_from_name: &str) -> Value {
204 let mut schema_fields = Vec::new();
205 schema_fields.push(json!({
206 "type": "struct",
207 "fields": fields_to_json(&schema.fields),
208 "optional": true,
209 "field": "before",
210 "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
211 }));
212 schema_fields.push(json!({
213 "type": "struct",
214 "fields": fields_to_json(&schema.fields),
215 "optional": true,
216 "field": "after",
217 "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
218 }));
219
220 schema_fields.push(json!({
221 "type": "struct",
222 "optional": false,
223 "name": concat_debezium_name_field(db_name, sink_from_name, "Source"),
224 "fields": vec![
225 json!({
226 "type": "string",
227 "optional": false,
228 "field": "db"
229 }),
230 json!({
231 "type": "string",
232 "optional": true,
233 "field": "table"
234 }),
235 json!({
236 "type": "int64",
237 "optional": false,
238 "field": "ts_ms"
239 }),
240 ],
241
242 "field": "source"
243 }));
244 schema_fields.push(json!({
245 "type": "string",
246 "optional": false,
247 "field": "op"
248 }));
249 schema_fields.push(json!({
250 "type": "int64",
251 "optional": false,
252 "field": "ts_ms"
253 }));
254
255 json!({
256 "type": "struct",
257 "fields": schema_fields,
258 "optional": false,
259 "name": concat_debezium_name_field(db_name, sink_from_name, "Envelope"),
260 })
261}
262
263pub(crate) fn fields_pk_to_json(fields: &[Field], pk_indices: &[usize]) -> Value {
264 let mut res = Vec::new();
265 for idx in pk_indices {
266 res.push(field_to_json(&fields[*idx]));
267 }
268 json!(res)
269}
270
271pub(crate) fn fields_to_json(fields: &[Field]) -> Value {
272 let mut res = Vec::new();
273
274 fields
275 .iter()
276 .for_each(|field| res.push(field_to_json(field)));
277
278 json!(res)
279}
280
281pub(crate) fn field_to_json(field: &Field) -> Value {
282 let (r#type, name) = match field.data_type() {
284 risingwave_common::types::DataType::Boolean => ("boolean", ""),
285 risingwave_common::types::DataType::Int16 => ("int16", ""),
286 risingwave_common::types::DataType::Int32 => ("int32", ""),
287 risingwave_common::types::DataType::Int64 => ("int64", ""),
288 risingwave_common::types::DataType::Int256 => ("string", ""),
289 risingwave_common::types::DataType::Float32 => ("float", ""),
290 risingwave_common::types::DataType::Float64 => ("double", ""),
291 risingwave_common::types::DataType::Decimal => ("string", ""),
294
295 risingwave_common::types::DataType::Varchar => ("string", ""),
296
297 risingwave_common::types::DataType::Date => ("int32", "org.apache.kafka.connect.data.Date"),
299 risingwave_common::types::DataType::Time => ("int64", "org.apache.kafka.connect.data.Time"),
301 risingwave_common::types::DataType::Timestamp => {
302 ("int64", "org.apache.kafka.connect.data.Timestamp")
303 }
304
305 risingwave_common::types::DataType::Timestamptz => {
306 ("string", "io.debezium.time.ZonedTimestamp")
307 }
308 risingwave_common::types::DataType::Interval => ("string", "io.debezium.time.Interval"),
309
310 risingwave_common::types::DataType::Bytea => ("bytes", ""),
311 risingwave_common::types::DataType::Jsonb => ("string", "io.debezium.data.Json"),
312 risingwave_common::types::DataType::Serial => ("int32", ""),
313 risingwave_common::types::DataType::Struct(_) => ("string", ""),
316 risingwave_common::types::DataType::List { .. } => ("string", ""),
317 risingwave_common::types::DataType::Map(_) => ("string", ""),
318 };
319
320 if name.is_empty() {
321 json!({
322 "field": field.name,
323 "optional": true,
324 "type": r#type
325 })
326 } else {
327 json!({
328 "field": field.name,
329 "optional": true,
330 "type": r#type,
331 "name": name
332 })
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use risingwave_common::test_prelude::StreamChunkTestExt;
339 use risingwave_common::types::{DataType, StructType};
340
341 use super::*;
342 use crate::sink::utils::chunk_to_json;
343
344 const SCHEMA_JSON_RESULT: &str = r#"{"fields":[{"field":"before","fields":[{"field":"v1","optional":true,"type":"int32"},{"field":"v2","optional":true,"type":"float"},{"field":"v3","optional":true,"type":"string"}],"name":"RisingWave.test_db.test_table.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"v1","optional":true,"type":"int32"},{"field":"v2","optional":true,"type":"float"},{"field":"v3","optional":true,"type":"string"}],"name":"RisingWave.test_db.test_table.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.test_db.test_table.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.test_db.test_table.Envelope","optional":false,"type":"struct"}"#;
345
346 #[test]
347 fn test_chunk_to_json() -> Result<()> {
348 let chunk = StreamChunk::from_pretty(
349 " i f <i,f>
350 + 0 0.0 (0,0.0)
351 + 1 1.0 (1,1.0)
352 + 2 2.0 (2,2.0)
353 + 3 3.0 (3,3.0)
354 + 4 4.0 (4,4.0)
355 + 5 5.0 (5,5.0)
356 + 6 6.0 (6,6.0)
357 + 7 7.0 (7,7.0)
358 + 8 8.0 (8,8.0)
359 + 9 9.0 (9,9.0)",
360 );
361
362 let schema = Schema::new(vec![
363 Field {
364 data_type: DataType::Int32,
365 name: "v1".into(),
366 },
367 Field {
368 data_type: DataType::Float32,
369 name: "v2".into(),
370 },
371 Field {
372 data_type: StructType::new(vec![
373 ("v4", DataType::Int32),
374 ("v5", DataType::Float32),
375 ])
376 .into(),
377 name: "v3".into(),
378 },
379 ]);
380
381 let encoder = JsonEncoder::new(
382 schema.clone(),
383 None,
384 DateHandlingMode::FromEpoch,
385 TimestampHandlingMode::Milli,
386 TimestamptzHandlingMode::UtcString,
387 TimeHandlingMode::Milli,
388 JsonbHandlingMode::String,
389 );
390 let json_chunk = chunk_to_json(chunk, &encoder).unwrap();
391 let schema_json = schema_to_json(&schema, "test_db", "test_table");
392 assert_eq!(
393 schema_json,
394 serde_json::from_str::<Value>(SCHEMA_JSON_RESULT).unwrap()
395 );
396 assert_eq!(
397 serde_json::from_str::<Value>(&json_chunk[0]).unwrap(),
398 serde_json::from_str::<Value>("{\"v1\":0,\"v2\":0.0,\"v3\":{\"v4\":0,\"v5\":0.0}}")
399 .unwrap()
400 );
401
402 Ok(())
403 }
404}