risingwave_frontend/optimizer/rule/
table_function_to_postgres_query_rule.rs1use itertools::Itertools;
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::{DataType, ScalarImpl};
18
19use super::prelude::{PlanRef, *};
20use crate::expr::{Expr, TableFunctionType};
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{LogicalPostgresQuery, LogicalTableFunction};
23
24pub struct TableFunctionToPostgresQueryRule {}
26impl Rule<Logical> for TableFunctionToPostgresQueryRule {
27 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
28 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
29 if logical_table_function.table_function.function_type != TableFunctionType::PostgresQuery {
30 return None;
31 }
32 assert!(!logical_table_function.with_ordinality);
33 let table_function_return_type = logical_table_function.table_function().return_type();
34
35 if let DataType::Struct(st) = table_function_return_type.clone() {
36 let fields = st
37 .iter()
38 .map(|(name, data_type)| Field::with_name(data_type.clone(), name.to_owned()))
39 .collect_vec();
40
41 let schema = Schema::new(fields);
42
43 assert_eq!(logical_table_function.table_function().args.len(), 6);
44 let mut eval_args = vec![];
45 for arg in &logical_table_function.table_function().args {
46 assert_eq!(arg.return_type(), DataType::Varchar);
47 let value = arg.try_fold_const().unwrap().unwrap();
48 match value {
49 Some(ScalarImpl::Utf8(s)) => {
50 eval_args.push(s.to_string());
51 }
52 _ => {
53 unreachable!("must be a varchar")
54 }
55 }
56 }
57 let hostname = eval_args[0].clone();
58 let port = eval_args[1].clone();
59 let username = eval_args[2].clone();
60 let password = eval_args[3].clone();
61 let database = eval_args[4].clone();
62 let query = eval_args[5].clone();
63
64 Some(
65 LogicalPostgresQuery::new(
66 logical_table_function.ctx(),
67 schema,
68 hostname,
69 port,
70 username,
71 password,
72 database,
73 query,
74 )
75 .into(),
76 )
77 } else {
78 unreachable!("TableFunction return type should be struct")
79 }
80 }
81}
82
83impl TableFunctionToPostgresQueryRule {
84 pub fn create() -> BoxedRule {
85 Box::new(TableFunctionToPostgresQueryRule {})
86 }
87}