risingwave_frontend/optimizer/plan_node/
logical_postgres_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPostgresQuery, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef,
23 LogicalProject, PlanBase, PredicatePushdown, ToBatch, ToStream, generic,
24};
25use crate::OptimizerContextRef;
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
31 ToStreamContext,
32};
33use crate::utils::{ColIndexMapping, Condition};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalPostgresQuery {
37 pub base: PlanBase<Logical>,
38 pub core: generic::PostgresQuery,
39}
40
41impl LogicalPostgresQuery {
42 pub fn new(
43 ctx: OptimizerContextRef,
44 schema: Schema,
45 hostname: String,
46 port: String,
47 username: String,
48 password: String,
49 database: String,
50 query: String,
51 ssl_mode: Option<String>,
52 ssl_root_cert: Option<String>,
53 ) -> Self {
54 let core = generic::PostgresQuery {
55 schema,
56 hostname,
57 port,
58 username,
59 password,
60 database,
61 query,
62 ssl_mode,
63 ssl_root_cert,
64 ctx,
65 };
66
67 let base = PlanBase::new_logical_with_core(&core);
68
69 LogicalPostgresQuery { base, core }
70 }
71}
72
73impl_plan_tree_node_for_leaf! { Logical, LogicalPostgresQuery}
74impl Distill for LogicalPostgresQuery {
75 fn distill<'a>(&self) -> XmlNode<'a> {
76 let fields = vec![("columns", column_names_pretty(self.schema()))];
77 childless_record("LogicalPostgresQuery", fields)
78 }
79}
80
81impl ColPrunable for LogicalPostgresQuery {
82 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
83 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
84 }
85}
86
87impl ExprRewritable<Logical> for LogicalPostgresQuery {}
88
89impl ExprVisitable for LogicalPostgresQuery {}
90
91impl PredicatePushdown for LogicalPostgresQuery {
92 fn predicate_pushdown(
93 &self,
94 predicate: Condition,
95 _ctx: &mut PredicatePushdownContext,
96 ) -> PlanRef {
97 LogicalFilter::create(self.clone().into(), predicate)
99 }
100}
101
102impl ToBatch for LogicalPostgresQuery {
103 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
104 Ok(BatchPostgresQuery::new(self.core.clone()).into())
105 }
106}
107
108impl ToStream for LogicalPostgresQuery {
109 fn to_stream(
110 &self,
111 _ctx: &mut ToStreamContext,
112 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
113 bail!("postgres_query function is not supported in streaming mode")
114 }
115
116 fn logical_rewrite_for_stream(
117 &self,
118 _ctx: &mut RewriteStreamContext,
119 ) -> Result<(PlanRef, ColIndexMapping)> {
120 bail!("postgres_query function is not supported in streaming mode")
121 }
122}