risingwave_frontend/optimizer/plan_node/
stream_fs_fetch.rs1use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbStreamFsFetch, StreamFsFetchNode};
21
22use super::stream::prelude::*;
23use super::{PlanBase, PlanRef, PlanTreeNodeUnary};
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::utils::{Distill, childless_record};
27use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamFsFetch {
34 pub base: PlanBase<Stream>,
35 input: PlanRef,
36 core: generic::Source,
37}
38
39impl PlanTreeNodeUnary for StreamFsFetch {
40 fn input(&self) -> PlanRef {
41 self.input.clone()
42 }
43
44 fn clone_with_input(&self, input: PlanRef) -> Self {
45 Self::new(input, self.core.clone())
46 }
47}
48impl_plan_tree_node_for_unary! { StreamFsFetch }
49
50impl StreamFsFetch {
51 pub fn new(input: PlanRef, source: generic::Source) -> Self {
52 let base = PlanBase::new_stream_with_core(
53 &source,
54 Distribution::SomeShard,
55 source.catalog.as_ref().is_none_or(|s| s.append_only),
56 false,
57 WatermarkColumns::new(),
58 MonotonicityMap::new(), );
60
61 Self {
62 base,
63 input,
64 core: source,
65 }
66 }
67
68 fn get_columns(&self) -> Vec<&str> {
69 self.core
70 .column_catalog
71 .iter()
72 .map(|column| column.name())
73 .collect()
74 }
75
76 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
77 self.core.catalog.clone()
78 }
79}
80
81impl Distill for StreamFsFetch {
82 fn distill<'a>(&self) -> XmlNode<'a> {
83 let columns = self
84 .get_columns()
85 .iter()
86 .map(|ele| Pretty::from(ele.to_string()))
87 .collect();
88 let col = Pretty::Array(columns);
89 childless_record("StreamFsFetch", vec![("columns", col)])
90 }
91}
92
93impl ExprRewritable for StreamFsFetch {}
94
95impl ExprVisitable for StreamFsFetch {}
96
97impl StreamNode for StreamFsFetch {
98 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
99 let source_catalog = self.source_catalog();
101
102 let source_inner = source_catalog.map(|source_catalog| {
103 let (with_properties, secret_refs) =
104 source_catalog.with_properties.clone().into_parts();
105 PbStreamFsFetch {
106 source_id: source_catalog.id,
107 source_name: source_catalog.name.clone(),
108 state_table: Some(
109 generic::Source::infer_internal_table_catalog(true)
110 .with_id(state.gen_table_id_wrapped())
111 .to_internal_table_prost(),
112 ),
113 info: Some(source_catalog.info.clone()),
114 row_id_index: self.core.row_id_index.map(|index| index as _),
115 columns: self
116 .core
117 .column_catalog
118 .iter()
119 .map(|c| c.to_protobuf())
120 .collect_vec(),
121 with_properties,
122 rate_limit: source_catalog.rate_limit,
123 secret_refs,
124 }
125 });
126 NodeBody::StreamFsFetch(Box::new(StreamFsFetchNode {
127 node_inner: source_inner,
128 }))
129 }
130}