risingwave_frontend/optimizer/plan_node/
logical_mysql_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchMySqlQuery, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
23 PredicatePushdown, ToBatch, ToStream, generic,
24};
25use crate::OptimizerContextRef;
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
31 ToStreamContext,
32};
33use crate::utils::{ColIndexMapping, Condition};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalMySqlQuery {
37 pub base: PlanBase<Logical>,
38 pub core: generic::MySqlQuery,
39}
40
41impl LogicalMySqlQuery {
42 pub fn new(
43 ctx: OptimizerContextRef,
44 schema: Schema,
45 hostname: String,
46 port: String,
47 username: String,
48 password: String,
49 database: String,
50 query: String,
51 ) -> Self {
52 let core = generic::MySqlQuery {
53 schema,
54 hostname,
55 port,
56 username,
57 password,
58 database,
59 query,
60 ctx,
61 };
62
63 let base = PlanBase::new_logical_with_core(&core);
64
65 LogicalMySqlQuery { base, core }
66 }
67}
68
69impl_plan_tree_node_for_leaf! {LogicalMySqlQuery}
70impl Distill for LogicalMySqlQuery {
71 fn distill<'a>(&self) -> XmlNode<'a> {
72 let fields = vec![("columns", column_names_pretty(self.schema()))];
73 childless_record("LogicalMySqlQuery", fields)
74 }
75}
76
77impl ColPrunable for LogicalMySqlQuery {
78 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
79 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
80 }
81}
82
83impl ExprRewritable for LogicalMySqlQuery {}
84
85impl ExprVisitable for LogicalMySqlQuery {}
86
87impl PredicatePushdown for LogicalMySqlQuery {
88 fn predicate_pushdown(
89 &self,
90 predicate: Condition,
91 _ctx: &mut PredicatePushdownContext,
92 ) -> PlanRef {
93 LogicalFilter::create(self.clone().into(), predicate)
95 }
96}
97
98impl ToBatch for LogicalMySqlQuery {
99 fn to_batch(&self) -> Result<PlanRef> {
100 Ok(BatchMySqlQuery::new(self.core.clone()).into())
101 }
102}
103
104impl ToStream for LogicalMySqlQuery {
105 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
106 bail!("mysql_query function is not supported in streaming mode")
107 }
108
109 fn logical_rewrite_for_stream(
110 &self,
111 _ctx: &mut RewriteStreamContext,
112 ) -> Result<(PlanRef, ColIndexMapping)> {
113 bail!("mysql_query function is not supported in streaming mode")
114 }
115}