risingwave_frontend/optimizer/plan_node/
batch_postgres_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::PostgresQueryNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record, column_names_pretty};
21use super::{
22    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
23    generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, Order};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchPostgresQuery {
31    pub base: PlanBase<Batch>,
32    pub core: generic::PostgresQuery,
33}
34
35impl BatchPostgresQuery {
36    pub fn new(core: generic::PostgresQuery) -> Self {
37        let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
38
39        Self { base, core }
40    }
41
42    pub fn column_names(&self) -> Vec<&str> {
43        self.schema().names_str()
44    }
45
46    pub fn clone_with_dist(&self) -> Self {
47        let base = self.base.clone_with_new_distribution(Distribution::Single);
48        Self {
49            base,
50            core: self.core.clone(),
51        }
52    }
53}
54
55impl_plan_tree_node_for_leaf! { Batch, BatchPostgresQuery }
56
57impl Distill for BatchPostgresQuery {
58    fn distill<'a>(&self) -> XmlNode<'a> {
59        let fields = vec![("columns", column_names_pretty(self.schema()))];
60        childless_record("BatchPostgresQuery", fields)
61    }
62}
63
64impl ToLocalBatch for BatchPostgresQuery {
65    fn to_local(&self) -> Result<PlanRef> {
66        Ok(self.clone_with_dist().into())
67    }
68}
69
70impl ToDistributedBatch for BatchPostgresQuery {
71    fn to_distributed(&self) -> Result<PlanRef> {
72        Ok(self.clone_with_dist().into())
73    }
74}
75
76impl ToBatchPb for BatchPostgresQuery {
77    fn to_batch_prost_body(&self) -> NodeBody {
78        NodeBody::PostgresQuery(PostgresQueryNode {
79            columns: self
80                .core
81                .columns()
82                .iter()
83                .map(|c| c.to_protobuf())
84                .collect(),
85            hostname: self.core.hostname.clone(),
86            port: self.core.port.clone(),
87            username: self.core.username.clone(),
88            password: self.core.password.clone(),
89            database: self.core.database.clone(),
90            query: self.core.query.clone(),
91        })
92    }
93}
94
95impl ExprRewritable<Batch> for BatchPostgresQuery {}
96
97impl ExprVisitable for BatchPostgresQuery {}