risingwave_batch_executors/executor/
postgres_query.rs
1use anyhow::Context;
16use futures_async_stream::try_stream;
17use futures_util::stream::StreamExt;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
22use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24use thiserror_ext::AsReport;
25use tokio_postgres;
26
27use crate::error::BatchError;
28use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29
30pub struct PostgresQueryExecutor {
32 schema: Schema,
33 host: String,
34 port: String,
35 username: String,
36 password: String,
37 database: String,
38 query: String,
39 identity: String,
40 chunk_size: usize,
41}
42
43impl Executor for PostgresQueryExecutor {
44 fn schema(&self) -> &risingwave_common::catalog::Schema {
45 &self.schema
46 }
47
48 fn identity(&self) -> &str {
49 &self.identity
50 }
51
52 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53 self.do_execute().boxed()
54 }
55}
56
57pub fn postgres_row_to_owned_row(
58 row: tokio_postgres::Row,
59 schema: &Schema,
60) -> Result<OwnedRow, BatchError> {
61 let mut datums = vec![];
62 for i in 0..schema.fields.len() {
63 let rw_field = &schema.fields[i];
64 let name = rw_field.name.as_str();
65 let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?;
66 datums.push(datum);
67 }
68 Ok(OwnedRow::new(datums))
69}
70
71fn postgres_cell_to_scalar_impl(
73 row: &tokio_postgres::Row,
74 data_type: &DataType,
75 i: usize,
76 name: &str,
77) -> Result<Datum, BatchError> {
78 let datum = match data_type {
79 DataType::Boolean
80 | DataType::Int16
81 | DataType::Int32
82 | DataType::Int64
83 | DataType::Float32
84 | DataType::Float64
85 | DataType::Date
86 | DataType::Time
87 | DataType::Timestamp
88 | DataType::Timestamptz
89 | DataType::Jsonb
90 | DataType::Interval
91 | DataType::Varchar
92 | DataType::Bytea => {
93 row.try_get::<_, Option<ScalarImpl>>(i)?
95 }
96 DataType::Decimal => {
97 let val = row.try_get::<_, Option<Decimal>>(i)?;
99 val.map(ScalarImpl::from)
100 }
101 _ => {
102 tracing::warn!(name, ?data_type, "unsupported data type, set to null");
103 None
104 }
105 };
106 Ok(datum)
107}
108
109impl PostgresQueryExecutor {
110 pub fn new(
111 schema: Schema,
112 host: String,
113 port: String,
114 username: String,
115 password: String,
116 database: String,
117 query: String,
118 identity: String,
119 chunk_size: usize,
120 ) -> Self {
121 Self {
122 schema,
123 host,
124 port,
125 username,
126 password,
127 database,
128 query,
129 identity,
130 chunk_size,
131 }
132 }
133
134 #[try_stream(ok = DataChunk, error = BatchError)]
135 async fn do_execute(self: Box<Self>) {
136 tracing::debug!("postgres_query_executor: started");
137 let conn_str = format!(
138 "host={} port={} user={} password={} dbname={}",
139 self.host, self.port, self.username, self.password, self.database
140 );
141 let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?;
142
143 tokio::spawn(async move {
144 if let Err(e) = conn.await {
145 tracing::error!(
146 "postgres_query_executor: connection error: {:?}",
147 e.as_report()
148 );
149 }
150 });
151
152 let params: &[&str] = &[];
153 let row_stream = client
154 .query_raw(&self.query, params)
155 .await
156 .context("postgres_query received error from remote server")?;
157 let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
158 tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
159 #[for_await]
161 for row in row_stream {
162 let row = row?;
163 let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
164 if let Some(chunk) = builder.append_one_row(owned_row) {
165 yield chunk;
166 }
167 }
168 if let Some(chunk) = builder.consume_all() {
169 yield chunk;
170 }
171 return Ok(());
172 }
173}
174
175pub struct PostgresQueryExecutorBuilder {}
176
177impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder {
178 async fn new_boxed_executor(
179 source: &ExecutorBuilder<'_>,
180 _inputs: Vec<BoxedExecutor>,
181 ) -> crate::error::Result<BoxedExecutor> {
182 let postgres_query_node = try_match_expand!(
183 source.plan_node().get_node_body().unwrap(),
184 NodeBody::PostgresQuery
185 )?;
186
187 Ok(Box::new(PostgresQueryExecutor::new(
188 Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)),
189 postgres_query_node.hostname.clone(),
190 postgres_query_node.port.clone(),
191 postgres_query_node.username.clone(),
192 postgres_query_node.password.clone(),
193 postgres_query_node.database.clone(),
194 postgres_query_node.query.clone(),
195 source.plan_node().get_identity().clone(),
196 source.context().get_config().developer.chunk_size,
197 )))
198 }
199}