risingwave_frontend/optimizer/plan_node/generic/
postgres_query.rs1use educe::Educe;
16use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
17
18use super::GenericPlanNode;
19use crate::optimizer::optimizer_context::OptimizerContextRef;
20use crate::optimizer::property::FunctionalDependencySet;
21
22#[derive(Debug, Clone, Educe)]
23#[educe(PartialEq, Eq, Hash)]
24pub struct PostgresQuery {
25 pub schema: Schema,
26 pub hostname: String,
27 pub port: String,
28 pub username: String,
29 pub password: String,
30 pub database: String,
31 pub query: String,
32 pub ssl_mode: Option<String>,
33 pub ssl_root_cert: Option<String>,
34
35 #[educe(PartialEq(ignore))]
36 #[educe(Hash(ignore))]
37 pub ctx: OptimizerContextRef,
38}
39
40impl GenericPlanNode for PostgresQuery {
41 fn schema(&self) -> Schema {
42 self.schema.clone()
43 }
44
45 fn stream_key(&self) -> Option<Vec<usize>> {
46 None
47 }
48
49 fn ctx(&self) -> OptimizerContextRef {
50 self.ctx.clone()
51 }
52
53 fn functional_dependency(&self) -> FunctionalDependencySet {
54 FunctionalDependencySet::new(self.schema.len())
55 }
56}
57
58impl PostgresQuery {
59 pub fn columns(&self) -> Vec<ColumnDesc> {
60 self.schema
61 .fields
62 .iter()
63 .enumerate()
64 .map(|(i, f)| {
65 ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
66 })
67 .collect()
68 }
69}