risingwave_batch_executors/executor/
postgres_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use futures_async_stream::try_stream;
17use futures_util::stream::StreamExt;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
22use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24use thiserror_ext::AsReport;
25use tokio_postgres;
26
27use crate::error::BatchError;
28use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29
30/// `PostgresQuery` executor. Runs a query against a Postgres database.
31pub struct PostgresQueryExecutor {
32    schema: Schema,
33    host: String,
34    port: String,
35    username: String,
36    password: String,
37    database: String,
38    query: String,
39    identity: String,
40    chunk_size: usize,
41}
42
43impl Executor for PostgresQueryExecutor {
44    fn schema(&self) -> &risingwave_common::catalog::Schema {
45        &self.schema
46    }
47
48    fn identity(&self) -> &str {
49        &self.identity
50    }
51
52    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53        self.do_execute().boxed()
54    }
55}
56
57pub fn postgres_row_to_owned_row(
58    row: tokio_postgres::Row,
59    schema: &Schema,
60) -> Result<OwnedRow, BatchError> {
61    let mut datums = vec![];
62    for i in 0..schema.fields.len() {
63        let rw_field = &schema.fields[i];
64        let name = rw_field.name.as_str();
65        let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?;
66        datums.push(datum);
67    }
68    Ok(OwnedRow::new(datums))
69}
70
71// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter.
72fn postgres_cell_to_scalar_impl(
73    row: &tokio_postgres::Row,
74    data_type: &DataType,
75    i: usize,
76    name: &str,
77) -> Result<Datum, BatchError> {
78    let datum = match data_type {
79        DataType::Boolean
80        | DataType::Int16
81        | DataType::Int32
82        | DataType::Int64
83        | DataType::Float32
84        | DataType::Float64
85        | DataType::Date
86        | DataType::Time
87        | DataType::Timestamp
88        | DataType::Timestamptz
89        | DataType::Jsonb
90        | DataType::Interval
91        | DataType::Varchar
92        | DataType::Bytea => {
93            // ScalarAdapter is also fine. But ScalarImpl is more efficient
94            row.try_get::<_, Option<ScalarImpl>>(i)?
95        }
96        DataType::Decimal => {
97            // Decimal is more efficient than PgNumeric in ScalarAdapter
98            let val = row.try_get::<_, Option<Decimal>>(i)?;
99            val.map(ScalarImpl::from)
100        }
101        _ => {
102            tracing::warn!(name, ?data_type, "unsupported data type, set to null");
103            None
104        }
105    };
106    Ok(datum)
107}
108
109impl PostgresQueryExecutor {
110    pub fn new(
111        schema: Schema,
112        host: String,
113        port: String,
114        username: String,
115        password: String,
116        database: String,
117        query: String,
118        identity: String,
119        chunk_size: usize,
120    ) -> Self {
121        Self {
122            schema,
123            host,
124            port,
125            username,
126            password,
127            database,
128            query,
129            identity,
130            chunk_size,
131        }
132    }
133
134    #[try_stream(ok = DataChunk, error = BatchError)]
135    async fn do_execute(self: Box<Self>) {
136        tracing::debug!("postgres_query_executor: started");
137        let mut conf = tokio_postgres::Config::new();
138        let port = self
139            .port
140            .parse()
141            .map_err(|_| risingwave_expr::ExprError::InvalidParam {
142                name: "port",
143                reason: self.port.clone().into(),
144            })?;
145        let (client, conn) = conf
146            .host(&self.host)
147            .port(port)
148            .user(&self.username)
149            .password(self.password)
150            .dbname(&self.database)
151            .connect(tokio_postgres::NoTls)
152            .await?;
153
154        tokio::spawn(async move {
155            if let Err(e) = conn.await {
156                tracing::error!(
157                    "postgres_query_executor: connection error: {:?}",
158                    e.as_report()
159                );
160            }
161        });
162
163        let params: &[&str] = &[];
164        let row_stream = client
165            .query_raw(&self.query, params)
166            .await
167            .context("postgres_query received error from remote server")?;
168        let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
169        tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
170        // deserialize the rows
171        #[for_await]
172        for row in row_stream {
173            let row = row?;
174            let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
175            if let Some(chunk) = builder.append_one_row(owned_row) {
176                yield chunk;
177            }
178        }
179        if let Some(chunk) = builder.consume_all() {
180            yield chunk;
181        }
182        return Ok(());
183    }
184}
185
186pub struct PostgresQueryExecutorBuilder {}
187
188impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder {
189    async fn new_boxed_executor(
190        source: &ExecutorBuilder<'_>,
191        _inputs: Vec<BoxedExecutor>,
192    ) -> crate::error::Result<BoxedExecutor> {
193        let postgres_query_node = try_match_expand!(
194            source.plan_node().get_node_body().unwrap(),
195            NodeBody::PostgresQuery
196        )?;
197
198        Ok(Box::new(PostgresQueryExecutor::new(
199            Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)),
200            postgres_query_node.hostname.clone(),
201            postgres_query_node.port.clone(),
202            postgres_query_node.username.clone(),
203            postgres_query_node.password.clone(),
204            postgres_query_node.database.clone(),
205            postgres_query_node.query.clone(),
206            source.plan_node().get_identity().clone(),
207            source.context().get_config().developer.chunk_size,
208        )))
209    }
210}