risingwave_frontend/optimizer/plan_node/
batch_source.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::SourceNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_sqlparser::ast::AsOf;
21
22use super::batch::prelude::*;
23use super::utils::{Distill, childless_record, column_names_pretty};
24use super::{
25 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
26 generic,
27};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::error::Result;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::property::{Distribution, Order};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct BatchSource {
38 pub base: PlanBase<Batch>,
39 pub core: generic::Source,
40}
41
42impl BatchSource {
43 pub fn new(core: generic::Source) -> Self {
44 let base = PlanBase::new_batch_with_core(
45 &core,
46 Distribution::Single,
48 Order::any(),
49 );
50
51 Self { base, core }
52 }
53
54 pub fn column_names(&self) -> Vec<&str> {
55 self.schema().names_str()
56 }
57
58 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
59 self.core.catalog.clone()
60 }
61
62 pub fn as_of(&self) -> Option<AsOf> {
63 self.core.as_of.clone()
64 }
65
66 pub fn clone_with_dist(&self) -> Self {
67 let base = self
68 .base
69 .clone_with_new_distribution(Distribution::SomeShard);
70 Self {
71 base,
72 core: self.core.clone(),
73 }
74 }
75}
76
77impl_plan_tree_node_for_leaf! { Batch, BatchSource }
78
79impl Distill for BatchSource {
80 fn distill<'a>(&self) -> XmlNode<'a> {
81 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
82 let mut fields = vec![
83 ("source", src),
84 ("columns", column_names_pretty(self.schema())),
85 ];
86 if let Some(as_of) = &self.core.as_of {
87 fields.push(("as_of", Pretty::debug(as_of)));
88 }
89 childless_record("BatchSource", fields)
90 }
91}
92
93impl ToLocalBatch for BatchSource {
94 fn to_local(&self) -> Result<PlanRef> {
95 Ok(self.clone_with_dist().into())
96 }
97}
98
99impl ToDistributedBatch for BatchSource {
100 fn to_distributed(&self) -> Result<PlanRef> {
101 Ok(self.clone_with_dist().into())
102 }
103}
104
105impl ToBatchPb for BatchSource {
106 fn to_batch_prost_body(&self) -> NodeBody {
107 let source_catalog = self.source_catalog().unwrap();
108 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
109 NodeBody::Source(SourceNode {
110 source_id: source_catalog.id,
111 info: Some(source_catalog.info.clone()),
112 columns: self
113 .core
114 .column_catalog
115 .iter()
116 .map(|c| c.to_protobuf())
117 .collect(),
118 with_properties,
119 split: vec![],
120 secret_refs,
121 })
122 }
123}
124
125impl ExprRewritable<Batch> for BatchSource {}
126
127impl ExprVisitable for BatchSource {}