risingwave_frontend/optimizer/rule/
table_function_to_mysql_query_rule.rs1use itertools::Itertools;
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::{DataType, ScalarImpl};
18
19use super::{BoxedRule, Rule};
20use crate::expr::{Expr, TableFunctionType};
21use crate::optimizer::PlanRef;
22use crate::optimizer::plan_node::generic::GenericPlanRef;
23use crate::optimizer::plan_node::{LogicalMySqlQuery, LogicalTableFunction};
25
26pub struct TableFunctionToMySqlQueryRule {}
28impl Rule for TableFunctionToMySqlQueryRule {
29 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
31 if logical_table_function.table_function.function_type != TableFunctionType::MysqlQuery {
32 return None;
33 }
34 assert!(!logical_table_function.with_ordinality);
35 let table_function_return_type = logical_table_function.table_function().return_type();
36
37 if let DataType::Struct(st) = table_function_return_type.clone() {
38 let fields = st
39 .iter()
40 .map(|(name, data_type)| Field::with_name(data_type.clone(), name.to_owned()))
41 .collect_vec();
42
43 let schema = Schema::new(fields);
44
45 assert_eq!(logical_table_function.table_function().args.len(), 6);
46 let mut eval_args = vec![];
47 for arg in &logical_table_function.table_function().args {
48 assert_eq!(arg.return_type(), DataType::Varchar);
49 let value = arg.try_fold_const().unwrap().unwrap();
50 match value {
51 Some(ScalarImpl::Utf8(s)) => {
52 eval_args.push(s.to_string());
53 }
54 _ => {
55 unreachable!("must be a varchar")
56 }
57 }
58 }
59 let hostname = eval_args[0].clone();
60 let port = eval_args[1].clone();
61 let username = eval_args[2].clone();
62 let password = eval_args[3].clone();
63 let database = eval_args[4].clone();
64 let query = eval_args[5].clone();
65
66 Some(
67 LogicalMySqlQuery::new(
68 logical_table_function.ctx(),
69 schema,
70 hostname,
71 port,
72 username,
73 password,
74 database,
75 query,
76 )
77 .into(),
78 )
79 } else {
80 unreachable!("TableFunction return type should be struct")
81 }
82 }
83}
84
85impl TableFunctionToMySqlQueryRule {
86 pub fn create() -> BoxedRule {
87 Box::new(TableFunctionToMySqlQueryRule {})
88 }
89}