risingwave_frontend/optimizer/plan_node/
batch_postgres_query.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::PostgresQueryNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record, column_names_pretty};
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
23 generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, Order};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchPostgresQuery {
31 pub base: PlanBase<Batch>,
32 pub core: generic::PostgresQuery,
33}
34
35impl BatchPostgresQuery {
36 pub fn new(core: generic::PostgresQuery) -> Self {
37 let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
38
39 Self { base, core }
40 }
41
42 pub fn column_names(&self) -> Vec<&str> {
43 self.schema().names_str()
44 }
45
46 pub fn clone_with_dist(&self) -> Self {
47 let base = self.base.clone_with_new_distribution(Distribution::Single);
48 Self {
49 base,
50 core: self.core.clone(),
51 }
52 }
53}
54
55impl_plan_tree_node_for_leaf! { Batch, BatchPostgresQuery }
56
57impl Distill for BatchPostgresQuery {
58 fn distill<'a>(&self) -> XmlNode<'a> {
59 let fields = vec![("columns", column_names_pretty(self.schema()))];
60 childless_record("BatchPostgresQuery", fields)
61 }
62}
63
64impl ToLocalBatch for BatchPostgresQuery {
65 fn to_local(&self) -> Result<PlanRef> {
66 Ok(self.clone_with_dist().into())
67 }
68}
69
70impl ToDistributedBatch for BatchPostgresQuery {
71 fn to_distributed(&self) -> Result<PlanRef> {
72 Ok(self.clone_with_dist().into())
73 }
74}
75
76impl ToBatchPb for BatchPostgresQuery {
77 fn to_batch_prost_body(&self) -> NodeBody {
78 NodeBody::PostgresQuery(PostgresQueryNode {
79 columns: self
80 .core
81 .columns()
82 .iter()
83 .map(|c| c.to_protobuf())
84 .collect(),
85 hostname: self.core.hostname.clone(),
86 port: self.core.port.clone(),
87 username: self.core.username.clone(),
88 password: self.core.password.clone(),
89 database: self.core.database.clone(),
90 query: self.core.query.clone(),
91 })
92 }
93}
94
95impl ExprRewritable<Batch> for BatchPostgresQuery {}
96
97impl ExprVisitable for BatchPostgresQuery {}