risingwave_frontend/optimizer/plan_node/
batch_mysql_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::MySqlQueryNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record, column_names_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{Distribution, Order};
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct BatchMySqlQuery {
30 pub base: PlanBase<Batch>,
31 pub core: generic::MySqlQuery,
32}
33
34impl BatchMySqlQuery {
35 pub fn new(core: generic::MySqlQuery) -> Self {
36 let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
37
38 Self { base, core }
39 }
40
41 pub fn column_names(&self) -> Vec<&str> {
42 self.schema().names_str()
43 }
44
45 pub fn clone_with_dist(&self) -> Self {
46 let base = self.base.clone_with_new_distribution(Distribution::Single);
47 Self {
48 base,
49 core: self.core.clone(),
50 }
51 }
52}
53
54impl_plan_tree_node_for_leaf! { BatchMySqlQuery }
55
56impl Distill for BatchMySqlQuery {
57 fn distill<'a>(&self) -> XmlNode<'a> {
58 let fields = vec![("columns", column_names_pretty(self.schema()))];
59 childless_record("BatchMySqlQuery", fields)
60 }
61}
62
63impl ToLocalBatch for BatchMySqlQuery {
64 fn to_local(&self) -> Result<PlanRef> {
65 Ok(self.clone_with_dist().into())
66 }
67}
68
69impl ToDistributedBatch for BatchMySqlQuery {
70 fn to_distributed(&self) -> Result<PlanRef> {
71 Ok(self.clone_with_dist().into())
72 }
73}
74
75impl ToBatchPb for BatchMySqlQuery {
76 fn to_batch_prost_body(&self) -> NodeBody {
77 NodeBody::MysqlQuery(MySqlQueryNode {
78 columns: self
79 .core
80 .columns()
81 .iter()
82 .map(|c| c.to_protobuf())
83 .collect(),
84 hostname: self.core.hostname.clone(),
85 port: self.core.port.clone(),
86 username: self.core.username.clone(),
87 password: self.core.password.clone(),
88 database: self.core.database.clone(),
89 query: self.core.query.clone(),
90 })
91 }
92}
93
94impl ExprRewritable for BatchMySqlQuery {}
95
96impl ExprVisitable for BatchMySqlQuery {}