risingwave_batch_executors/executor/
mysql_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use futures_async_stream::try_stream;
17use futures_util::stream::StreamExt;
18use mysql_async;
19use mysql_async::prelude::*;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_connector::parser::mysql_datum_to_rw_datum;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, BatchExternalSystemError};
28use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29
30/// `MySqlQuery` executor. Runs a query against a `MySql` database.
31pub struct MySqlQueryExecutor {
32    schema: Schema,
33    host: String,
34    port: String,
35    username: String,
36    password: String,
37    database: String,
38    query: String,
39    identity: String,
40    chunk_size: usize,
41}
42
43impl Executor for MySqlQueryExecutor {
44    fn schema(&self) -> &risingwave_common::catalog::Schema {
45        &self.schema
46    }
47
48    fn identity(&self) -> &str {
49        &self.identity
50    }
51
52    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53        self.do_execute().boxed()
54    }
55}
56pub fn mysql_row_to_owned_row(
57    mut row: mysql_async::Row,
58    schema: &Schema,
59) -> Result<OwnedRow, BatchError> {
60    let mut datums = vec![];
61    for i in 0..schema.fields.len() {
62        let rw_field = &schema.fields[i];
63        let name = rw_field.name.as_str();
64        let datum = match mysql_datum_to_rw_datum(&mut row, i, name, &rw_field.data_type) {
65            Ok(val) => val,
66            Err(e) => {
67                let e = BatchExternalSystemError(e);
68                return Err(e.into());
69            }
70        };
71        datums.push(datum);
72    }
73    Ok(OwnedRow::new(datums))
74}
75
76impl MySqlQueryExecutor {
77    pub fn new(
78        schema: Schema,
79        host: String,
80        port: String,
81        username: String,
82        password: String,
83        database: String,
84        query: String,
85        identity: String,
86        chunk_size: usize,
87    ) -> Self {
88        Self {
89            schema,
90            host,
91            port,
92            username,
93            password,
94            database,
95            query,
96            identity,
97            chunk_size,
98        }
99    }
100
101    #[try_stream(ok = DataChunk, error = BatchError)]
102    async fn do_execute(self: Box<Self>) {
103        tracing::debug!("mysql_query_executor: started");
104        let database_opts: mysql_async::Opts = mysql_async::OptsBuilder::default()
105            .ip_or_hostname(self.host)
106            .tcp_port(self.port.parse::<u16>().unwrap()) // FIXME
107            .user(Some(self.username))
108            .pass(Some(self.password))
109            .db_name(Some(self.database))
110            .into();
111
112        let pool = mysql_async::Pool::new(database_opts);
113        let mut conn = pool
114            .get_conn()
115            .await
116            .context("failed to connect to mysql in batch executor")?;
117
118        let query = self.query;
119        let mut query_iter = conn
120            .query_iter(query)
121            .await
122            .context("failed to execute my_sql_query in batch executor")?;
123        let Some(row_stream) = query_iter.stream::<mysql_async::Row>().await? else {
124            bail!("failed to get row stream from mysql query")
125        };
126
127        let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
128        tracing::debug!("mysql_query_executor: query executed, start deserializing rows");
129        // deserialize the rows
130        #[for_await]
131        for row in row_stream {
132            let row = row?;
133            let owned_row = mysql_row_to_owned_row(row, &self.schema)?;
134            if let Some(chunk) = builder.append_one_row(owned_row) {
135                yield chunk;
136            }
137        }
138        if let Some(chunk) = builder.consume_all() {
139            yield chunk;
140        }
141        return Ok(());
142    }
143}
144
145pub struct MySqlQueryExecutorBuilder {}
146
147impl BoxedExecutorBuilder for MySqlQueryExecutorBuilder {
148    async fn new_boxed_executor(
149        source: &ExecutorBuilder<'_>,
150        _inputs: Vec<BoxedExecutor>,
151    ) -> crate::error::Result<BoxedExecutor> {
152        let mysql_query_node = try_match_expand!(
153            source.plan_node().get_node_body().unwrap(),
154            NodeBody::MysqlQuery
155        )?;
156
157        Ok(Box::new(MySqlQueryExecutor::new(
158            Schema::from_iter(mysql_query_node.columns.iter().map(Field::from)),
159            mysql_query_node.hostname.clone(),
160            mysql_query_node.port.clone(),
161            mysql_query_node.username.clone(),
162            mysql_query_node.password.clone(),
163            mysql_query_node.database.clone(),
164            mysql_query_node.query.clone(),
165            source.plan_node().get_identity().clone(),
166            source.context().get_config().developer.chunk_size,
167        )))
168    }
169}