risingwave_batch_executors/executor/
mysql_query.rs1use anyhow::Context;
16use futures_async_stream::try_stream;
17use futures_util::stream::StreamExt;
18use mysql_async;
19use mysql_async::prelude::*;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::{Field, Schema};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_connector::parser::mysql_datum_to_rw_datum;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, BatchExternalSystemError};
28use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
29
30pub struct MySqlQueryExecutor {
32 schema: Schema,
33 host: String,
34 port: String,
35 username: String,
36 password: String,
37 database: String,
38 query: String,
39 identity: String,
40 chunk_size: usize,
41}
42
43impl Executor for MySqlQueryExecutor {
44 fn schema(&self) -> &risingwave_common::catalog::Schema {
45 &self.schema
46 }
47
48 fn identity(&self) -> &str {
49 &self.identity
50 }
51
52 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53 self.do_execute().boxed()
54 }
55}
56pub fn mysql_row_to_owned_row(
57 mut row: mysql_async::Row,
58 schema: &Schema,
59) -> Result<OwnedRow, BatchError> {
60 let mut datums = vec![];
61 for i in 0..schema.fields.len() {
62 let rw_field = &schema.fields[i];
63 let name = rw_field.name.as_str();
64 let datum = match mysql_datum_to_rw_datum(&mut row, i, name, &rw_field.data_type) {
65 Ok(val) => val,
66 Err(e) => {
67 let e = BatchExternalSystemError(e);
68 return Err(e.into());
69 }
70 };
71 datums.push(datum);
72 }
73 Ok(OwnedRow::new(datums))
74}
75
76impl MySqlQueryExecutor {
77 pub fn new(
78 schema: Schema,
79 host: String,
80 port: String,
81 username: String,
82 password: String,
83 database: String,
84 query: String,
85 identity: String,
86 chunk_size: usize,
87 ) -> Self {
88 Self {
89 schema,
90 host,
91 port,
92 username,
93 password,
94 database,
95 query,
96 identity,
97 chunk_size,
98 }
99 }
100
101 #[try_stream(ok = DataChunk, error = BatchError)]
102 async fn do_execute(self: Box<Self>) {
103 tracing::debug!("mysql_query_executor: started");
104 let database_opts: mysql_async::Opts = mysql_async::OptsBuilder::default()
105 .ip_or_hostname(self.host)
106 .tcp_port(self.port.parse::<u16>().unwrap()) .user(Some(self.username))
108 .pass(Some(self.password))
109 .db_name(Some(self.database))
110 .into();
111
112 let pool = mysql_async::Pool::new(database_opts);
113 let mut conn = pool
114 .get_conn()
115 .await
116 .context("failed to connect to mysql in batch executor")?;
117
118 let query = self.query;
119 let mut query_iter = conn
120 .query_iter(query)
121 .await
122 .context("failed to execute my_sql_query in batch executor")?;
123 let Some(row_stream) = query_iter.stream::<mysql_async::Row>().await? else {
124 bail!("failed to get row stream from mysql query")
125 };
126
127 let mut builder = DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
128 tracing::debug!("mysql_query_executor: query executed, start deserializing rows");
129 #[for_await]
131 for row in row_stream {
132 let row = row?;
133 let owned_row = mysql_row_to_owned_row(row, &self.schema)?;
134 if let Some(chunk) = builder.append_one_row(owned_row) {
135 yield chunk;
136 }
137 }
138 if let Some(chunk) = builder.consume_all() {
139 yield chunk;
140 }
141 return Ok(());
142 }
143}
144
145pub struct MySqlQueryExecutorBuilder {}
146
147impl BoxedExecutorBuilder for MySqlQueryExecutorBuilder {
148 async fn new_boxed_executor(
149 source: &ExecutorBuilder<'_>,
150 _inputs: Vec<BoxedExecutor>,
151 ) -> crate::error::Result<BoxedExecutor> {
152 let mysql_query_node = try_match_expand!(
153 source.plan_node().get_node_body().unwrap(),
154 NodeBody::MysqlQuery
155 )?;
156
157 Ok(Box::new(MySqlQueryExecutor::new(
158 Schema::from_iter(mysql_query_node.columns.iter().map(Field::from)),
159 mysql_query_node.hostname.clone(),
160 mysql_query_node.port.clone(),
161 mysql_query_node.username.clone(),
162 mysql_query_node.password.clone(),
163 mysql_query_node.database.clone(),
164 mysql_query_node.query.clone(),
165 source.plan_node().get_identity().clone(),
166 source.context().get_config().developer.chunk_size,
167 )))
168 }
169}