risingwave_batch_executors/executor/
postgres_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use futures_async_stream::try_stream;
17use futures_util::stream::StreamExt;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
22use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24use thiserror_ext::AsReport;
25use tokio_postgres;
26
27use crate::error::BatchError;
28use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29
30/// `PostgresQuery` executor. Runs a query against a Postgres database.
31pub struct PostgresQueryExecutor {
32    schema: Schema,
33    host: String,
34    port: String,
35    username: String,
36    password: String,
37    database: String,
38    query: String,
39    identity: String,
40    chunk_size: usize,
41}
42
43impl Executor for PostgresQueryExecutor {
44    fn schema(&self) -> &risingwave_common::catalog::Schema {
45        &self.schema
46    }
47
48    fn identity(&self) -> &str {
49        &self.identity
50    }
51
52    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53        self.do_execute().boxed()
54    }
55}
56
57pub fn postgres_row_to_owned_row(
58    row: tokio_postgres::Row,
59    schema: &Schema,
60) -> Result<OwnedRow, BatchError> {
61    let mut datums = vec![];
62    for i in 0..schema.fields.len() {
63        let rw_field = &schema.fields[i];
64        let name = rw_field.name.as_str();
65        let datum = postgres_cell_to_scalar_impl(&row, &rw_field.data_type, i, name)?;
66        datums.push(datum);
67    }
68    Ok(OwnedRow::new(datums))
69}
70
71// TODO(kwannoel): Support more types, see postgres connector's ScalarAdapter.
72fn postgres_cell_to_scalar_impl(
73    row: &tokio_postgres::Row,
74    data_type: &DataType,
75    i: usize,
76    name: &str,
77) -> Result<Datum, BatchError> {
78    let datum = match data_type {
79        DataType::Boolean
80        | DataType::Int16
81        | DataType::Int32
82        | DataType::Int64
83        | DataType::Float32
84        | DataType::Float64
85        | DataType::Date
86        | DataType::Time
87        | DataType::Timestamp
88        | DataType::Timestamptz
89        | DataType::Jsonb
90        | DataType::Interval
91        | DataType::Varchar
92        | DataType::Bytea => {
93            // ScalarAdapter is also fine. But ScalarImpl is more efficient
94            row.try_get::<_, Option<ScalarImpl>>(i)?
95        }
96        DataType::Decimal => {
97            // Decimal is more efficient than PgNumeric in ScalarAdapter
98            let val = row.try_get::<_, Option<Decimal>>(i)?;
99            val.map(ScalarImpl::from)
100        }
101        _ => {
102            tracing::warn!(name, ?data_type, "unsupported data type, set to null");
103            None
104        }
105    };
106    Ok(datum)
107}
108
109impl PostgresQueryExecutor {
110    pub fn new(
111        schema: Schema,
112        host: String,
113        port: String,
114        username: String,
115        password: String,
116        database: String,
117        query: String,
118        identity: String,
119        chunk_size: usize,
120    ) -> Self {
121        Self {
122            schema,
123            host,
124            port,
125            username,
126            password,
127            database,
128            query,
129            identity,
130            chunk_size,
131        }
132    }
133
134    #[try_stream(ok = DataChunk, error = BatchError)]
135    async fn do_execute(self: Box<Self>) {
136        tracing::debug!("postgres_query_executor: started");
137        let conn_str = format!(
138            "host={} port={} user={} password={} dbname={}",
139            self.host, self.port, self.username, self.password, self.database
140        );
141        let (client, conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?;
142
143        tokio::spawn(async move {
144            if let Err(e) = conn.await {
145                tracing::error!(
146                    "postgres_query_executor: connection error: {:?}",
147                    e.as_report()
148                );
149            }
150        });
151
152        let params: &[&str] = &[];
153        let row_stream = client
154            .query_raw(&self.query, params)
155            .await
156            .context("postgres_query received error from remote server")?;
157        let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
158        tracing::debug!("postgres_query_executor: query executed, start deserializing rows");
159        // deserialize the rows
160        #[for_await]
161        for row in row_stream {
162            let row = row?;
163            let owned_row = postgres_row_to_owned_row(row, &self.schema)?;
164            if let Some(chunk) = builder.append_one_row(owned_row) {
165                yield chunk;
166            }
167        }
168        if let Some(chunk) = builder.consume_all() {
169            yield chunk;
170        }
171        return Ok(());
172    }
173}
174
175pub struct PostgresQueryExecutorBuilder {}
176
177impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder {
178    async fn new_boxed_executor(
179        source: &ExecutorBuilder<'_>,
180        _inputs: Vec<BoxedExecutor>,
181    ) -> crate::error::Result<BoxedExecutor> {
182        let postgres_query_node = try_match_expand!(
183            source.plan_node().get_node_body().unwrap(),
184            NodeBody::PostgresQuery
185        )?;
186
187        Ok(Box::new(PostgresQueryExecutor::new(
188            Schema::from_iter(postgres_query_node.columns.iter().map(Field::from)),
189            postgres_query_node.hostname.clone(),
190            postgres_query_node.port.clone(),
191            postgres_query_node.username.clone(),
192            postgres_query_node.password.clone(),
193            postgres_query_node.database.clone(),
194            postgres_query_node.query.clone(),
195            source.plan_node().get_identity().clone(),
196            source.context().get_config().developer.chunk_size,
197        )))
198    }
199}