risingwave_connector/parser/
postgres.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::log::LogSuppresser;
19use risingwave_common::row::OwnedRow;
20use risingwave_common::types::{DataType, Decimal, ScalarImpl};
21use thiserror_ext::AsReport;
22
23use crate::parser::scalar_adapter::ScalarAdapter;
24use crate::parser::utils::log_error;
25
26static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
27
28macro_rules! handle_data_type {
29    ($row:expr, $i:expr, $name:expr, $type:ty) => {{
30        let res = $row.try_get::<_, Option<$type>>($i);
31        match res {
32            Ok(val) => val.map(|v| ScalarImpl::from(v)),
33            Err(err) => {
34                log_error!($name, err, "parse column failed");
35                None
36            }
37        }
38    }};
39}
40
41pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow {
42    let mut datums = vec![];
43    for i in 0..schema.fields.len() {
44        let rw_field = &schema.fields[i];
45        let name = rw_field.name.as_str();
46        let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name);
47        datums.push(datum);
48    }
49    OwnedRow::new(datums)
50}
51
52fn postgres_cell_to_scalar_impl(
53    row: &tokio_postgres::Row,
54    data_type: &DataType,
55    i: usize,
56    name: &str,
57) -> Option<ScalarImpl> {
58    // We observe several incompatibility issue in Debezium's Postgres connector. We summarize them here:
59    // Issue #1. The null of enum list is not supported in Debezium. An enum list contains `NULL` will fallback to `NULL`.
60    // Issue #2. In our parser, when there's inf, -inf, nan or invalid item in a list, the whole list will fallback null.
61    match data_type {
62        DataType::Boolean
63        | DataType::Int16
64        | DataType::Int32
65        | DataType::Int64
66        | DataType::Float32
67        | DataType::Float64
68        | DataType::Date
69        | DataType::Time
70        | DataType::Timestamp
71        | DataType::Timestamptz
72        | DataType::Jsonb
73        | DataType::Interval
74        | DataType::Bytea => {
75            // ScalarAdapter is also fine. But ScalarImpl is more efficient
76            let res = row.try_get::<_, Option<ScalarImpl>>(i);
77            match res {
78                Ok(val) => val,
79                Err(err) => {
80                    log_error!(name, err, "parse column failed");
81                    None
82                }
83            }
84        }
85        DataType::Decimal => {
86            // Decimal is more efficient than PgNumeric in ScalarAdapter
87            handle_data_type!(row, i, name, Decimal)
88        }
89        DataType::Varchar | DataType::Int256 => {
90            let res = row.try_get::<_, Option<ScalarAdapter>>(i);
91            match res {
92                Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
93                Err(err) => {
94                    log_error!(name, err, "parse column failed");
95                    None
96                }
97            }
98        }
99        DataType::List(dtype) => match **dtype {
100            // TODO(Kexiang): allow DataType::List(_)
101            DataType::Struct(_) | DataType::List(_) | DataType::Serial => {
102                tracing::warn!(
103                    "unsupported List data type {:?}, set the List to empty",
104                    **dtype
105                );
106                None
107            }
108            _ => {
109                let res = row.try_get::<_, Option<ScalarAdapter>>(i);
110                match res {
111                    Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
112                    Err(err) => {
113                        log_error!(name, err, "parse list column failed");
114                        None
115                    }
116                }
117            }
118        },
119        DataType::Struct(_) | DataType::Serial | DataType::Map(_) => {
120            // Is this branch reachable?
121            // Struct and Serial are not supported
122            tracing::warn!(name, ?data_type, "unsupported data type, set to null");
123            None
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use tokio_postgres::NoTls;
131
132    use crate::parser::scalar_adapter::EnumString;
133    const DB: &str = "postgres";
134    const USER: &str = "kexiang";
135
136    #[ignore]
137    #[tokio::test]
138    async fn enum_string_integration_test() {
139        let connect = format!(
140            "host=localhost port=5432 user={} password={} dbname={}",
141            USER, DB, DB
142        );
143        let (client, connection) = tokio_postgres::connect(connect.as_str(), NoTls)
144            .await
145            .unwrap();
146
147        // The connection object performs the actual communication with the database,
148        // so spawn it off to run on its own.
149        tokio::spawn(async move {
150            if let Err(e) = connection.await {
151                eprintln!("connection error: {}", e);
152            }
153        });
154
155        // allow type existed
156        let _ = client
157            .execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[])
158            .await;
159        client
160            .execute(
161                "CREATE TABLE IF NOT EXISTS person(id int PRIMARY KEY, current_mood mood)",
162                &[],
163            )
164            .await
165            .unwrap();
166        client.execute("DELETE FROM person;", &[]).await.unwrap();
167        client
168            .execute("INSERT INTO person VALUES (1, 'happy')", &[])
169            .await
170            .unwrap();
171
172        // test from_sql
173        let got: EnumString = client
174            .query_one("SELECT * FROM person", &[])
175            .await
176            .unwrap()
177            .get::<usize, Option<EnumString>>(1)
178            .unwrap();
179        assert_eq!("happy", got.0.as_str());
180
181        client.execute("DELETE FROM person", &[]).await.unwrap();
182
183        // test to_sql
184        client
185            .execute("INSERT INTO person VALUES (2, $1)", &[&got])
186            .await
187            .unwrap();
188
189        let got_new: EnumString = client
190            .query_one("SELECT * FROM person", &[])
191            .await
192            .unwrap()
193            .get::<usize, Option<EnumString>>(1)
194            .unwrap();
195        assert_eq!("happy", got_new.0.as_str());
196        client.execute("DROP TABLE person", &[]).await.unwrap();
197        client.execute("DROP TYPE mood", &[]).await.unwrap();
198    }
199}