risingwave_connector/parser/
postgres.rsuse std::sync::LazyLock;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Decimal, ScalarImpl};
use thiserror_ext::AsReport;
use crate::parser::scalar_adapter::ScalarAdapter;
use crate::parser::util::log_error;
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
macro_rules! handle_data_type {
($row:expr, $i:expr, $name:expr, $type:ty) => {{
let res = $row.try_get::<_, Option<$type>>($i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
log_error!($name, err, "parse column failed");
None
}
}
}};
}
pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> OwnedRow {
let mut datums = vec![];
for i in 0..schema.fields.len() {
let rw_field = &schema.fields[i];
let name = rw_field.name.as_str();
let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name);
datums.push(datum);
}
OwnedRow::new(datums)
}
fn postgres_cell_to_scalar_impl(
row: &tokio_postgres::Row,
data_type: &DataType,
i: usize,
name: &str,
) -> Option<ScalarImpl> {
match data_type {
DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Jsonb
| DataType::Interval
| DataType::Bytea => {
let res = row.try_get::<_, Option<ScalarImpl>>(i);
match res {
Ok(val) => val,
Err(err) => {
log_error!(name, err, "parse column failed");
None
}
}
}
DataType::Decimal => {
handle_data_type!(row, i, name, Decimal)
}
DataType::Varchar | DataType::Int256 => {
let res = row.try_get::<_, Option<ScalarAdapter>>(i);
match res {
Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
Err(err) => {
log_error!(name, err, "parse column failed");
None
}
}
}
DataType::List(dtype) => match **dtype {
DataType::Struct(_) | DataType::List(_) | DataType::Serial => {
tracing::warn!(
"unsupported List data type {:?}, set the List to empty",
**dtype
);
None
}
_ => {
let res = row.try_get::<_, Option<ScalarAdapter>>(i);
match res {
Ok(val) => val.and_then(|v| v.into_scalar(data_type)),
Err(err) => {
log_error!(name, err, "parse list column failed");
None
}
}
}
},
DataType::Struct(_) | DataType::Serial | DataType::Map(_) => {
tracing::warn!(name, ?data_type, "unsupported data type, set to null");
None
}
}
}
#[cfg(test)]
mod tests {
use tokio_postgres::NoTls;
use crate::parser::scalar_adapter::EnumString;
const DB: &str = "postgres";
const USER: &str = "kexiang";
#[ignore]
#[tokio::test]
async fn enum_string_integration_test() {
let connect = format!(
"host=localhost port=5432 user={} password={} dbname={}",
USER, DB, DB
);
let (client, connection) = tokio_postgres::connect(connect.as_str(), NoTls)
.await
.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let _ = client
.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')", &[])
.await;
client
.execute(
"CREATE TABLE IF NOT EXISTS person(id int PRIMARY KEY, current_mood mood)",
&[],
)
.await
.unwrap();
client.execute("DELETE FROM person;", &[]).await.unwrap();
client
.execute("INSERT INTO person VALUES (1, 'happy')", &[])
.await
.unwrap();
let got: EnumString = client
.query_one("SELECT * FROM person", &[])
.await
.unwrap()
.get::<usize, Option<EnumString>>(1)
.unwrap();
assert_eq!("happy", got.0.as_str());
client.execute("DELETE FROM person", &[]).await.unwrap();
client
.execute("INSERT INTO person VALUES (2, $1)", &[&got])
.await
.unwrap();
let got_new: EnumString = client
.query_one("SELECT * FROM person", &[])
.await
.unwrap()
.get::<usize, Option<EnumString>>(1)
.unwrap();
assert_eq!("happy", got_new.0.as_str());
client.execute("DROP TABLE person", &[]).await.unwrap();
client.execute("DROP TYPE mood", &[]).await.unwrap();
}
}