risingwave_frontend/optimizer/plan_node/
logical_mysql_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchMySqlQuery, BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef,
23 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef, ToBatch,
24 ToStream, generic,
25};
26use crate::OptimizerContextRef;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31 ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
32 ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct LogicalMySqlQuery {
38 pub base: PlanBase<Logical>,
39 pub core: generic::MySqlQuery,
40}
41
42impl LogicalMySqlQuery {
43 pub fn new(
44 ctx: OptimizerContextRef,
45 schema: Schema,
46 hostname: String,
47 port: String,
48 username: String,
49 password: String,
50 database: String,
51 query: String,
52 ) -> Self {
53 let core = generic::MySqlQuery {
54 schema,
55 hostname,
56 port,
57 username,
58 password,
59 database,
60 query,
61 ctx,
62 };
63
64 let base = PlanBase::new_logical_with_core(&core);
65
66 LogicalMySqlQuery { base, core }
67 }
68}
69
70impl_plan_tree_node_for_leaf! { Logical, LogicalMySqlQuery}
71impl Distill for LogicalMySqlQuery {
72 fn distill<'a>(&self) -> XmlNode<'a> {
73 let fields = vec![("columns", column_names_pretty(self.schema()))];
74 childless_record("LogicalMySqlQuery", fields)
75 }
76}
77
78impl ColPrunable for LogicalMySqlQuery {
79 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
80 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
81 }
82}
83
84impl ExprRewritable<Logical> for LogicalMySqlQuery {}
85
86impl ExprVisitable for LogicalMySqlQuery {}
87
88impl PredicatePushdown for LogicalMySqlQuery {
89 fn predicate_pushdown(
90 &self,
91 predicate: Condition,
92 _ctx: &mut PredicatePushdownContext,
93 ) -> LogicalPlanRef {
94 LogicalFilter::create(self.clone().into(), predicate)
96 }
97}
98
99impl ToBatch for LogicalMySqlQuery {
100 fn to_batch(&self) -> Result<BatchPlanRef> {
101 Ok(BatchMySqlQuery::new(self.core.clone()).into())
102 }
103}
104
105impl ToStream for LogicalMySqlQuery {
106 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
107 bail!("mysql_query function is not supported in streaming mode")
108 }
109
110 fn logical_rewrite_for_stream(
111 &self,
112 _ctx: &mut RewriteStreamContext,
113 ) -> Result<(PlanRef, ColIndexMapping)> {
114 bail!("mysql_query function is not supported in streaming mode")
115 }
116}