risingwave_frontend/optimizer/plan_node/
batch_source.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::SourceNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_sqlparser::ast::AsOf;
21
22use super::batch::prelude::*;
23use super::utils::{Distill, childless_record, column_names_pretty};
24use super::{
25 ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
26};
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::error::Result;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::property::{Distribution, Order};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct BatchSource {
37 pub base: PlanBase<Batch>,
38 pub core: generic::Source,
39}
40
41impl BatchSource {
42 pub fn new(core: generic::Source) -> Self {
43 let base = PlanBase::new_batch_with_core(
44 &core,
45 Distribution::Single,
47 Order::any(),
48 );
49
50 Self { base, core }
51 }
52
53 pub fn column_names(&self) -> Vec<&str> {
54 self.schema().names_str()
55 }
56
57 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
58 self.core.catalog.clone()
59 }
60
61 pub fn as_of(&self) -> Option<AsOf> {
62 self.core.as_of.clone()
63 }
64
65 pub fn clone_with_dist(&self) -> Self {
66 let base = self
67 .base
68 .clone_with_new_distribution(Distribution::SomeShard);
69 Self {
70 base,
71 core: self.core.clone(),
72 }
73 }
74}
75
76impl_plan_tree_node_for_leaf! { BatchSource }
77
78impl Distill for BatchSource {
79 fn distill<'a>(&self) -> XmlNode<'a> {
80 let src = Pretty::from(self.source_catalog().unwrap().name.clone());
81 let mut fields = vec![
82 ("source", src),
83 ("columns", column_names_pretty(self.schema())),
84 ];
85 if let Some(as_of) = &self.core.as_of {
86 fields.push(("as_of", Pretty::debug(as_of)));
87 }
88 childless_record("BatchSource", fields)
89 }
90}
91
92impl ToLocalBatch for BatchSource {
93 fn to_local(&self) -> Result<PlanRef> {
94 Ok(self.clone_with_dist().into())
95 }
96}
97
98impl ToDistributedBatch for BatchSource {
99 fn to_distributed(&self) -> Result<PlanRef> {
100 Ok(self.clone_with_dist().into())
101 }
102}
103
104impl ToBatchPb for BatchSource {
105 fn to_batch_prost_body(&self) -> NodeBody {
106 let source_catalog = self.source_catalog().unwrap();
107 let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
108 NodeBody::Source(SourceNode {
109 source_id: source_catalog.id,
110 info: Some(source_catalog.info.clone()),
111 columns: self
112 .core
113 .column_catalog
114 .iter()
115 .map(|c| c.to_protobuf())
116 .collect(),
117 with_properties,
118 split: vec![],
119 secret_refs,
120 })
121 }
122}
123
124impl ExprRewritable for BatchSource {}
125
126impl ExprVisitable for BatchSource {}