risingwave_frontend/optimizer/plan_node/
logical_mysql_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22    BatchMySqlQuery, BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalPlanRef,
23    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef, ToBatch,
24    ToStream, generic,
25};
26use crate::OptimizerContextRef;
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::column_names_pretty;
30use crate::optimizer::plan_node::{
31    ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
32    ToStreamContext,
33};
34use crate::utils::{ColIndexMapping, Condition};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct LogicalMySqlQuery {
38    pub base: PlanBase<Logical>,
39    pub core: generic::MySqlQuery,
40}
41
42impl LogicalMySqlQuery {
43    pub fn new(
44        ctx: OptimizerContextRef,
45        schema: Schema,
46        hostname: String,
47        port: String,
48        username: String,
49        password: String,
50        database: String,
51        query: String,
52    ) -> Self {
53        let core = generic::MySqlQuery {
54            schema,
55            hostname,
56            port,
57            username,
58            password,
59            database,
60            query,
61            ctx,
62        };
63
64        let base = PlanBase::new_logical_with_core(&core);
65
66        LogicalMySqlQuery { base, core }
67    }
68}
69
70impl_plan_tree_node_for_leaf! { Logical, LogicalMySqlQuery}
71impl Distill for LogicalMySqlQuery {
72    fn distill<'a>(&self) -> XmlNode<'a> {
73        let fields = vec![("columns", column_names_pretty(self.schema()))];
74        childless_record("LogicalMySqlQuery", fields)
75    }
76}
77
78impl ColPrunable for LogicalMySqlQuery {
79    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
80        LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
81    }
82}
83
84impl ExprRewritable<Logical> for LogicalMySqlQuery {}
85
86impl ExprVisitable for LogicalMySqlQuery {}
87
88impl PredicatePushdown for LogicalMySqlQuery {
89    fn predicate_pushdown(
90        &self,
91        predicate: Condition,
92        _ctx: &mut PredicatePushdownContext,
93    ) -> LogicalPlanRef {
94        // No pushdown.
95        LogicalFilter::create(self.clone().into(), predicate)
96    }
97}
98
99impl ToBatch for LogicalMySqlQuery {
100    fn to_batch(&self) -> Result<BatchPlanRef> {
101        Ok(BatchMySqlQuery::new(self.core.clone()).into())
102    }
103}
104
105impl ToStream for LogicalMySqlQuery {
106    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
107        bail!("mysql_query function is not supported in streaming mode")
108    }
109
110    fn logical_rewrite_for_stream(
111        &self,
112        _ctx: &mut RewriteStreamContext,
113    ) -> Result<(PlanRef, ColIndexMapping)> {
114        bail!("mysql_query function is not supported in streaming mode")
115    }
116}