risingwave_connector/sink/formatter/
debezium_json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use risingwave_common::array::Op;
18use risingwave_common::catalog::{Field, Schema};
19use serde_json::{Map, Value, json};
20use tracing::warn;
21
22use super::{Result, SinkFormatter, StreamChunk};
23use crate::sink::encoder::{
24    DateHandlingMode, JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode,
25    TimestampHandlingMode, TimestamptzHandlingMode,
26};
27use crate::tri;
28
29const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave";
30
31pub struct DebeziumAdapterOpts {
32    gen_tombstone: bool,
33}
34
35impl Default for DebeziumAdapterOpts {
36    fn default() -> Self {
37        Self {
38            gen_tombstone: true,
39        }
40    }
41}
42
43fn concat_debezium_name_field(db_name: &str, sink_from_name: &str, value: &str) -> String {
44    DEBEZIUM_NAME_FIELD_PREFIX.to_owned() + "." + db_name + "." + sink_from_name + "." + value
45}
46
47pub struct DebeziumJsonFormatter {
48    schema: Schema,
49    pk_indices: Vec<usize>,
50    db_name: String,
51    sink_from_name: String,
52    opts: DebeziumAdapterOpts,
53    key_encoder: JsonEncoder,
54    val_encoder: JsonEncoder,
55}
56
57impl DebeziumJsonFormatter {
58    pub fn new(
59        schema: Schema,
60        pk_indices: Vec<usize>,
61        db_name: String,
62        sink_from_name: String,
63        opts: DebeziumAdapterOpts,
64    ) -> Self {
65        let key_encoder = JsonEncoder::new(
66            schema.clone(),
67            Some(pk_indices.clone()),
68            DateHandlingMode::FromEpoch,
69            TimestampHandlingMode::Milli,
70            TimestamptzHandlingMode::UtcString,
71            TimeHandlingMode::Milli,
72            JsonbHandlingMode::String,
73        );
74        let val_encoder = JsonEncoder::new(
75            schema.clone(),
76            None,
77            DateHandlingMode::FromEpoch,
78            TimestampHandlingMode::Milli,
79            TimestamptzHandlingMode::UtcString,
80            TimeHandlingMode::Milli,
81            JsonbHandlingMode::String,
82        );
83        Self {
84            schema,
85            pk_indices,
86            db_name,
87            sink_from_name,
88            opts,
89            key_encoder,
90            val_encoder,
91        }
92    }
93}
94
95impl SinkFormatter for DebeziumJsonFormatter {
96    type K = Value;
97    type V = Value;
98
99    fn format_chunk(
100        &self,
101        chunk: &StreamChunk,
102    ) -> impl Iterator<Item = Result<(Option<Value>, Option<Value>)>> {
103        std::iter::from_coroutine(
104            #[coroutine]
105            || {
106                let DebeziumJsonFormatter {
107                    schema,
108                    pk_indices,
109                    db_name,
110                    sink_from_name,
111                    opts,
112                    key_encoder,
113                    val_encoder,
114                } = self;
115                let ts_ms = SystemTime::now()
116                    .duration_since(UNIX_EPOCH)
117                    .unwrap()
118                    .as_millis() as u64;
119                let source_field = json!({
120                    // todo: still some missing fields in source field
121                    // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
122                    "db": db_name,
123                    "table": sink_from_name,
124                    "ts_ms": ts_ms,
125                });
126
127                let mut update_cache: Option<Map<String, Value>> = None;
128
129                for (op, row) in chunk.rows() {
130                    let event_key_object: Option<Value> = Some(json!({
131                        "schema": json!({
132                            "type": "struct",
133                            "fields": fields_pk_to_json(&schema.fields, pk_indices),
134                            "optional": false,
135                            "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
136                        }),
137                        "payload": tri!(key_encoder.encode(row)),
138                    }));
139                    let event_object: Option<Value> = match op {
140                        Op::Insert => Some(json!({
141                            "schema": schema_to_json(schema, db_name, sink_from_name),
142                            "payload": {
143                                "before": null,
144                                "after": tri!(val_encoder.encode(row)),
145                                "op": "c",
146                                "ts_ms": ts_ms,
147                                "source": source_field,
148                            }
149                        })),
150                        Op::Delete => {
151                            let value_obj = Some(json!({
152                                "schema": schema_to_json(schema, db_name, sink_from_name),
153                                "payload": {
154                                    "before": tri!(val_encoder.encode(row)),
155                                    "after": null,
156                                    "op": "d",
157                                    "ts_ms": ts_ms,
158                                    "source": source_field,
159                                }
160                            }));
161                            yield Ok((event_key_object.clone(), value_obj));
162
163                            if opts.gen_tombstone {
164                                // Tomestone event
165                                // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
166                                yield Ok((event_key_object, None));
167                            }
168
169                            continue;
170                        }
171                        Op::UpdateDelete => {
172                            update_cache = Some(tri!(val_encoder.encode(row)));
173                            continue;
174                        }
175                        Op::UpdateInsert => {
176                            if let Some(before) = update_cache.take() {
177                                Some(json!({
178                                    "schema": schema_to_json(schema, db_name, sink_from_name),
179                                    "payload": {
180                                        "before": before,
181                                        "after": tri!(val_encoder.encode(row)),
182                                        "op": "u",
183                                        "ts_ms": ts_ms,
184                                        "source": source_field,
185                                    }
186                                }))
187                            } else {
188                                warn!(
189                                    "not found UpdateDelete in prev row, skipping, row index {:?}",
190                                    row.index()
191                                );
192                                continue;
193                            }
194                        }
195                    };
196                    yield Ok((event_key_object, event_object));
197                }
198            },
199        )
200    }
201}
202
203pub(crate) fn schema_to_json(schema: &Schema, db_name: &str, sink_from_name: &str) -> Value {
204    let mut schema_fields = Vec::new();
205    schema_fields.push(json!({
206        "type": "struct",
207        "fields": fields_to_json(&schema.fields),
208        "optional": true,
209        "field": "before",
210        "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
211    }));
212    schema_fields.push(json!({
213        "type": "struct",
214        "fields": fields_to_json(&schema.fields),
215        "optional": true,
216        "field": "after",
217        "name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
218    }));
219
220    schema_fields.push(json!({
221        "type": "struct",
222        "optional": false,
223        "name": concat_debezium_name_field(db_name, sink_from_name, "Source"),
224        "fields": vec![
225            json!({
226                "type": "string",
227                "optional": false,
228                "field": "db"
229            }),
230            json!({
231                "type": "string",
232                "optional": true,
233                "field": "table"
234            }),
235            json!({
236                "type": "int64",
237                "optional": false,
238                "field": "ts_ms"
239            }),
240        ],
241
242        "field": "source"
243    }));
244    schema_fields.push(json!({
245        "type": "string",
246        "optional": false,
247        "field": "op"
248    }));
249    schema_fields.push(json!({
250        "type": "int64",
251        "optional": false,
252        "field": "ts_ms"
253    }));
254
255    json!({
256        "type": "struct",
257        "fields": schema_fields,
258        "optional": false,
259        "name": concat_debezium_name_field(db_name, sink_from_name, "Envelope"),
260    })
261}
262
263pub(crate) fn fields_pk_to_json(fields: &[Field], pk_indices: &[usize]) -> Value {
264    let mut res = Vec::new();
265    for idx in pk_indices {
266        res.push(field_to_json(&fields[*idx]));
267    }
268    json!(res)
269}
270
271pub(crate) fn fields_to_json(fields: &[Field]) -> Value {
272    let mut res = Vec::new();
273
274    fields
275        .iter()
276        .for_each(|field| res.push(field_to_json(field)));
277
278    json!(res)
279}
280
281pub(crate) fn field_to_json(field: &Field) -> Value {
282    // mapping from 'https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-data-types'
283    let (r#type, name) = match field.data_type() {
284        risingwave_common::types::DataType::Boolean => ("boolean", ""),
285        risingwave_common::types::DataType::Int16 => ("int16", ""),
286        risingwave_common::types::DataType::Int32 => ("int32", ""),
287        risingwave_common::types::DataType::Int64 => ("int64", ""),
288        risingwave_common::types::DataType::Int256 => ("string", ""),
289        risingwave_common::types::DataType::Float32 => ("float", ""),
290        risingwave_common::types::DataType::Float64 => ("double", ""),
291        // currently, we only support handling decimal as string.
292        // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-decimal-types
293        risingwave_common::types::DataType::Decimal => ("string", ""),
294
295        risingwave_common::types::DataType::Varchar => ("string", ""),
296
297        // use built-in Kafka Connect logical types.
298        risingwave_common::types::DataType::Date => ("int32", "org.apache.kafka.connect.data.Date"),
299        // may lose precision when Time/Timestamp has a fractional second precision value that is greater than 3. But for TimestampHandlingMode:Milli, it is sufficient.
300        risingwave_common::types::DataType::Time => ("int64", "org.apache.kafka.connect.data.Time"),
301        risingwave_common::types::DataType::Timestamp => {
302            ("int64", "org.apache.kafka.connect.data.Timestamp")
303        }
304
305        risingwave_common::types::DataType::Timestamptz => {
306            ("string", "io.debezium.time.ZonedTimestamp")
307        }
308        risingwave_common::types::DataType::Interval => ("string", "io.debezium.time.Interval"),
309
310        risingwave_common::types::DataType::Bytea => ("bytes", ""),
311        risingwave_common::types::DataType::Jsonb => ("string", "io.debezium.data.Json"),
312        risingwave_common::types::DataType::Serial => ("int32", ""),
313        // since the original debezium pg support HSTORE via encoded as json string by default,
314        // we do the same here
315        risingwave_common::types::DataType::Struct(_) => ("string", ""),
316        risingwave_common::types::DataType::List { .. } => ("string", ""),
317        risingwave_common::types::DataType::Map(_) => ("string", ""),
318    };
319
320    if name.is_empty() {
321        json!({
322            "field": field.name,
323            "optional": true,
324            "type": r#type
325        })
326    } else {
327        json!({
328            "field": field.name,
329            "optional": true,
330            "type": r#type,
331            "name": name
332        })
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use risingwave_common::test_prelude::StreamChunkTestExt;
339    use risingwave_common::types::{DataType, StructType};
340
341    use super::*;
342    use crate::sink::utils::chunk_to_json;
343
344    const SCHEMA_JSON_RESULT: &str = r#"{"fields":[{"field":"before","fields":[{"field":"v1","optional":true,"type":"int32"},{"field":"v2","optional":true,"type":"float"},{"field":"v3","optional":true,"type":"string"}],"name":"RisingWave.test_db.test_table.Key","optional":true,"type":"struct"},{"field":"after","fields":[{"field":"v1","optional":true,"type":"int32"},{"field":"v2","optional":true,"type":"float"},{"field":"v3","optional":true,"type":"string"}],"name":"RisingWave.test_db.test_table.Key","optional":true,"type":"struct"},{"field":"source","fields":[{"field":"db","optional":false,"type":"string"},{"field":"table","optional":true,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.test_db.test_table.Source","optional":false,"type":"struct"},{"field":"op","optional":false,"type":"string"},{"field":"ts_ms","optional":false,"type":"int64"}],"name":"RisingWave.test_db.test_table.Envelope","optional":false,"type":"struct"}"#;
345
346    #[test]
347    fn test_chunk_to_json() -> Result<()> {
348        let chunk = StreamChunk::from_pretty(
349            " i   f   <i,f>
350            + 0 0.0 (0,0.0)
351            + 1 1.0 (1,1.0)
352            + 2 2.0 (2,2.0)
353            + 3 3.0 (3,3.0)
354            + 4 4.0 (4,4.0)
355            + 5 5.0 (5,5.0)
356            + 6 6.0 (6,6.0)
357            + 7 7.0 (7,7.0)
358            + 8 8.0 (8,8.0)
359            + 9 9.0 (9,9.0)",
360        );
361
362        let schema = Schema::new(vec![
363            Field {
364                data_type: DataType::Int32,
365                name: "v1".into(),
366            },
367            Field {
368                data_type: DataType::Float32,
369                name: "v2".into(),
370            },
371            Field {
372                data_type: StructType::new(vec![
373                    ("v4", DataType::Int32),
374                    ("v5", DataType::Float32),
375                ])
376                .into(),
377                name: "v3".into(),
378            },
379        ]);
380
381        let encoder = JsonEncoder::new(
382            schema.clone(),
383            None,
384            DateHandlingMode::FromEpoch,
385            TimestampHandlingMode::Milli,
386            TimestamptzHandlingMode::UtcString,
387            TimeHandlingMode::Milli,
388            JsonbHandlingMode::String,
389        );
390        let json_chunk = chunk_to_json(chunk, &encoder).unwrap();
391        let schema_json = schema_to_json(&schema, "test_db", "test_table");
392        assert_eq!(
393            schema_json,
394            serde_json::from_str::<Value>(SCHEMA_JSON_RESULT).unwrap()
395        );
396        assert_eq!(
397            serde_json::from_str::<Value>(&json_chunk[0]).unwrap(),
398            serde_json::from_str::<Value>("{\"v1\":0,\"v2\":0.0,\"v3\":{\"v4\":0,\"v5\":0.0}}")
399                .unwrap()
400        );
401
402        Ok(())
403    }
404}