risingwave_connector/parser/
postgres.rs1use std::sync::LazyLock;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::log::LogSuppresser;
19use risingwave_common::row::OwnedRow;
20use risingwave_common::types::{DataType, Decimal, ScalarImpl};
21use thiserror_ext::AsReport;
22
23use crate::parser::scalar_adapter::ScalarAdapter;
24use crate::parser::utils::log_error;
25
26static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
27
28macro_rules! handle_data_type {
29 ($row:expr, $i:expr, $name:expr, $type:ty) => {{
30 let res = $row.try_get::<_, Option<$type>>($i);
31 match res {
32 Ok(val) => val.map(|v| ScalarImpl::from(v)),
33 Err(err) => {
34 log_error!($name, err, "parse column failed");
35 None
36 }
37 }
38 }};
39}
40
41pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow {
42 let mut datums = vec![];
43 for i in 0..schema.fields.len() {
44 let rw_field = &schema.fields[i];
45 let name = rw_field.name.as_str();
46 let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name);
47 datums.push(datum);
48 }
49 OwnedRow::new(datums)
50}
51
52fn postgres_cell_to_scalar_impl(
53 row: &tokio_postgres::Row,
54 data_type: &DataType,
55 i: usize,
56 name: &str,
57) -> Option<ScalarImpl> {
58 match data_type {
62 DataType::Boolean
63 | DataType::Int16
64 | DataType::Int32
65 | DataType::Int64
66 | DataType::Float32
67 | DataType::Float64
68 | DataType::Date
69 | DataType::Time
70 | DataType::Timestamp
71 | DataType::Timestamptz
72 | DataType::Jsonb
73 | DataType::Interval
74 | DataType::Bytea => {
75 let res = row.try_get::<_, Option<ScalarImpl>>(i);
77 match res {
78 Ok(val) => val,
79 Err(err) => {
80 log_error!(name, err, "parse column failed");
81 None
82 }
83 }
84 }
85 DataType::Decimal => {
86 handle_data_type!(row, i, name, Decimal)
88 }
89 DataType::Varchar | DataType::Int256 => {
90 let res = row.try_get::<_, Option<ScalarAdapter>>(i);
91 match res {
92 Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
93 Err(err) => {
94 log_error!(name, err, "parse column failed");
95 None
96 }
97 }
98 }
99 DataType::List(dtype) => match **dtype {
100 DataType::Struct(_) | DataType::List(_) | DataType::Serial => {
102 tracing::warn!(
103 "unsupported List data type {:?}, set the List to empty",
104 **dtype
105 );
106 None
107 }
108 _ => {
109 let res = row.try_get::<_, Option<ScalarAdapter>>(i);
110 match res {
111 Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
112 Err(err) => {
113 log_error!(name, err, "parse list column failed");
114 None
115 }
116 }
117 }
118 },
119 DataType::Struct(_) | DataType::Serial | DataType::Map(_) => {
120 tracing::warn!(name, ?data_type, "unsupported data type, set to null");
123 None
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use tokio_postgres::NoTls;
131
132 use crate::parser::scalar_adapter::EnumString;
133 const DB: &str = "postgres";
134 const USER: &str = "kexiang";
135
136 #[ignore]
137 #[tokio::test]
138 async fn enum_string_integration_test() {
139 let connect = format!(
140 "host=localhost port=5432 user={} password={} dbname={}",
141 USER, DB, DB
142 );
143 let (client, connection) = tokio_postgres::connect(connect.as_str(), NoTls)
144 .await
145 .unwrap();
146
147 tokio::spawn(async move {
150 if let Err(e) = connection.await {
151 eprintln!("connection error: {}", e);
152 }
153 });
154
155 let _ = client
157 .execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[])
158 .await;
159 client
160 .execute(
161 "CREATE TABLE IF NOT EXISTS person(id int PRIMARY KEY, current_mood mood)",
162 &[],
163 )
164 .await
165 .unwrap();
166 client.execute("DELETE FROM person;", &[]).await.unwrap();
167 client
168 .execute("INSERT INTO person VALUES (1, 'happy')", &[])
169 .await
170 .unwrap();
171
172 let got: EnumString = client
174 .query_one("SELECT * FROM person", &[])
175 .await
176 .unwrap()
177 .get::<usize, Option<EnumString>>(1)
178 .unwrap();
179 assert_eq!("happy", got.0.as_str());
180
181 client.execute("DELETE FROM person", &[]).await.unwrap();
182
183 client
185 .execute("INSERT INTO person VALUES (2, $1)", &[&got])
186 .await
187 .unwrap();
188
189 let got_new: EnumString = client
190 .query_one("SELECT * FROM person", &[])
191 .await
192 .unwrap()
193 .get::<usize, Option<EnumString>>(1)
194 .unwrap();
195 assert_eq!("happy", got_new.0.as_str());
196 client.execute("DROP TABLE person", &[]).await.unwrap();
197 client.execute("DROP TYPE mood", &[]).await.unwrap();
198 }
199}