risingwave_frontend/optimizer/rule/
table_function_to_mysql_query_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::{DataType, ScalarImpl};
18
19use super::{BoxedRule, Rule};
20use crate::expr::{Expr, TableFunctionType};
21use crate::optimizer::PlanRef;
22use crate::optimizer::plan_node::generic::GenericPlanRef;
23// use crate::optimizer::plan_node::{LogicalMySqlQuery, LogicalTableFunction};
24use crate::optimizer::plan_node::{LogicalMySqlQuery, LogicalTableFunction};
25
26/// Transform a special `TableFunction` (with `MYSQL_QUERY` table function type) into a `LogicalMySqlQuery`
27pub struct TableFunctionToMySqlQueryRule {}
28impl Rule for TableFunctionToMySqlQueryRule {
29    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
31        if logical_table_function.table_function.function_type != TableFunctionType::MysqlQuery {
32            return None;
33        }
34        assert!(!logical_table_function.with_ordinality);
35        let table_function_return_type = logical_table_function.table_function().return_type();
36
37        if let DataType::Struct(st) = table_function_return_type.clone() {
38            let fields = st
39                .iter()
40                .map(|(name, data_type)| Field::with_name(data_type.clone(), name.to_owned()))
41                .collect_vec();
42
43            let schema = Schema::new(fields);
44
45            assert_eq!(logical_table_function.table_function().args.len(), 6);
46            let mut eval_args = vec![];
47            for arg in &logical_table_function.table_function().args {
48                assert_eq!(arg.return_type(), DataType::Varchar);
49                let value = arg.try_fold_const().unwrap().unwrap();
50                match value {
51                    Some(ScalarImpl::Utf8(s)) => {
52                        eval_args.push(s.to_string());
53                    }
54                    _ => {
55                        unreachable!("must be a varchar")
56                    }
57                }
58            }
59            let hostname = eval_args[0].clone();
60            let port = eval_args[1].clone();
61            let username = eval_args[2].clone();
62            let password = eval_args[3].clone();
63            let database = eval_args[4].clone();
64            let query = eval_args[5].clone();
65
66            Some(
67                LogicalMySqlQuery::new(
68                    logical_table_function.ctx(),
69                    schema,
70                    hostname,
71                    port,
72                    username,
73                    password,
74                    database,
75                    query,
76                )
77                .into(),
78            )
79        } else {
80            unreachable!("TableFunction return type should be struct")
81        }
82    }
83}
84
85impl TableFunctionToMySqlQueryRule {
86    pub fn create() -> BoxedRule {
87        Box::new(TableFunctionToMySqlQueryRule {})
88    }
89}