risingwave_connector/parser/
postgres.rs1use std::sync::LazyLock;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::log::LogSuppresser;
19use risingwave_common::row::OwnedRow;
20use risingwave_common::types::{DataType, Decimal, ScalarImpl};
21use thiserror_ext::AsReport;
22
23use crate::parser::scalar_adapter::ScalarAdapter;
24use crate::parser::utils::log_error;
25
26static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
27
28macro_rules! handle_data_type {
29 ($row:expr, $i:expr, $name:expr, $type:ty) => {{
30 let res = $row.try_get::<_, Option<$type>>($i);
31 match res {
32 Ok(val) => val.map(|v| ScalarImpl::from(v)),
33 Err(err) => {
34 log_error!($name, err, "parse column failed");
35 None
36 }
37 }
38 }};
39}
40
41pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow {
42 let mut datums = vec![];
43 for i in 0..schema.fields.len() {
44 let rw_field = &schema.fields[i];
45 let name = rw_field.name.as_str();
46 let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name);
47 datums.push(datum);
48 }
49 OwnedRow::new(datums)
50}
51
52fn postgres_cell_to_scalar_impl(
53 row: &tokio_postgres::Row,
54 data_type: &DataType,
55 i: usize,
56 name: &str,
57) -> Option<ScalarImpl> {
58 match data_type {
62 DataType::Boolean
63 | DataType::Int16
64 | DataType::Int32
65 | DataType::Int64
66 | DataType::Float32
67 | DataType::Float64
68 | DataType::Date
69 | DataType::Time
70 | DataType::Timestamp
71 | DataType::Timestamptz
72 | DataType::Jsonb
73 | DataType::Interval
74 | DataType::Bytea => {
75 let res = row.try_get::<_, Option<ScalarImpl>>(i);
77 match res {
78 Ok(val) => val,
79 Err(err) => {
80 log_error!(name, err, "parse column failed");
81 None
82 }
83 }
84 }
85 DataType::Decimal => {
86 handle_data_type!(row, i, name, Decimal)
88 }
89 DataType::Varchar | DataType::Int256 => {
90 let res = row.try_get::<_, Option<ScalarAdapter>>(i);
91 match res {
92 Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
93 Err(err) => {
94 log_error!(name, err, "parse column failed");
95 None
96 }
97 }
98 }
99 DataType::List(dtype) => match **dtype {
100 DataType::Struct(_) | DataType::List(_) | DataType::Serial => {
102 tracing::warn!(
103 "unsupported List data type {:?}, set the List to empty",
104 **dtype
105 );
106 None
107 }
108 _ => {
109 let res = row.try_get::<_, Option<ScalarAdapter>>(i);
110 match res {
111 Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
112 Err(err) => {
113 log_error!(name, err, "parse list column failed");
114 None
115 }
116 }
117 }
118 },
119 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
120 DataType::Struct(_) | DataType::Serial | DataType::Map(_) => {
121 tracing::warn!(name, ?data_type, "unsupported data type, set to null");
124 None
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use tokio_postgres::NoTls;
132
133 use crate::parser::scalar_adapter::EnumString;
134 const DB: &str = "postgres";
135 const USER: &str = "kexiang";
136
137 #[ignore]
138 #[tokio::test]
139 async fn enum_string_integration_test() {
140 let connect = format!(
141 "host=localhost port=5432 user={} password={} dbname={}",
142 USER, DB, DB
143 );
144 let (client, connection) = tokio_postgres::connect(connect.as_str(), NoTls)
145 .await
146 .unwrap();
147
148 tokio::spawn(async move {
151 if let Err(e) = connection.await {
152 eprintln!("connection error: {}", e);
153 }
154 });
155
156 let _ = client
158 .execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[])
159 .await;
160 client
161 .execute(
162 "CREATE TABLE IF NOT EXISTS person(id int PRIMARY KEY, current_mood mood)",
163 &[],
164 )
165 .await
166 .unwrap();
167 client.execute("DELETE FROM person;", &[]).await.unwrap();
168 client
169 .execute("INSERT INTO person VALUES (1, 'happy')", &[])
170 .await
171 .unwrap();
172
173 let got: EnumString = client
175 .query_one("SELECT * FROM person", &[])
176 .await
177 .unwrap()
178 .get::<usize, Option<EnumString>>(1)
179 .unwrap();
180 assert_eq!("happy", got.0.as_str());
181
182 client.execute("DELETE FROM person", &[]).await.unwrap();
183
184 client
186 .execute("INSERT INTO person VALUES (2, $1)", &[&got])
187 .await
188 .unwrap();
189
190 let got_new: EnumString = client
191 .query_one("SELECT * FROM person", &[])
192 .await
193 .unwrap()
194 .get::<usize, Option<EnumString>>(1)
195 .unwrap();
196 assert_eq!("happy", got_new.0.as_str());
197 client.execute("DROP TABLE person", &[]).await.unwrap();
198 client.execute("DROP TYPE mood", &[]).await.unwrap();
199 }
200}