risingwave_frontend/optimizer/plan_node/
logical_postgres_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPostgresQuery, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
23 PredicatePushdown, ToBatch, ToStream, generic,
24};
25use crate::OptimizerContextRef;
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
31 ToStreamContext,
32};
33use crate::utils::{ColIndexMapping, Condition};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalPostgresQuery {
37 pub base: PlanBase<Logical>,
38 pub core: generic::PostgresQuery,
39}
40
41impl LogicalPostgresQuery {
42 pub fn new(
43 ctx: OptimizerContextRef,
44 schema: Schema,
45 hostname: String,
46 port: String,
47 username: String,
48 password: String,
49 database: String,
50 query: String,
51 ) -> Self {
52 let core = generic::PostgresQuery {
53 schema,
54 hostname,
55 port,
56 username,
57 password,
58 database,
59 query,
60 ctx,
61 };
62
63 let base = PlanBase::new_logical_with_core(&core);
64
65 LogicalPostgresQuery { base, core }
66 }
67}
68
69impl_plan_tree_node_for_leaf! {LogicalPostgresQuery}
70impl Distill for LogicalPostgresQuery {
71 fn distill<'a>(&self) -> XmlNode<'a> {
72 let fields = vec![("columns", column_names_pretty(self.schema()))];
73 childless_record("LogicalPostgresQuery", fields)
74 }
75}
76
77impl ColPrunable for LogicalPostgresQuery {
78 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
79 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
80 }
81}
82
83impl ExprRewritable for LogicalPostgresQuery {}
84
85impl ExprVisitable for LogicalPostgresQuery {}
86
87impl PredicatePushdown for LogicalPostgresQuery {
88 fn predicate_pushdown(
89 &self,
90 predicate: Condition,
91 _ctx: &mut PredicatePushdownContext,
92 ) -> PlanRef {
93 LogicalFilter::create(self.clone().into(), predicate)
95 }
96}
97
98impl ToBatch for LogicalPostgresQuery {
99 fn to_batch(&self) -> Result<PlanRef> {
100 Ok(BatchPostgresQuery::new(self.core.clone()).into())
101 }
102}
103
104impl ToStream for LogicalPostgresQuery {
105 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
106 bail!("postgres_query function is not supported in streaming mode")
107 }
108
109 fn logical_rewrite_for_stream(
110 &self,
111 _ctx: &mut RewriteStreamContext,
112 ) -> Result<(PlanRef, ColIndexMapping)> {
113 bail!("postgres_query function is not supported in streaming mode")
114 }
115}