risingwave_frontend/optimizer/plan_node/
stream_fs_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20use risingwave_pb::stream_plan::{PbStreamFsFetch, StreamFsFetchNode};
21
22use super::stream::prelude::*;
23use super::{PlanBase, PlanRef, PlanTreeNodeUnary};
24use crate::catalog::source_catalog::SourceCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::utils::{Distill, childless_record};
27use crate::optimizer::plan_node::{ExprRewritable, StreamNode, generic};
28use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31/// Fetch files from filesystem/s3/iceberg.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamFsFetch {
34    pub base: PlanBase<Stream>,
35    input: PlanRef,
36    core: generic::Source,
37}
38
39impl PlanTreeNodeUnary for StreamFsFetch {
40    fn input(&self) -> PlanRef {
41        self.input.clone()
42    }
43
44    fn clone_with_input(&self, input: PlanRef) -> Self {
45        Self::new(input, self.core.clone())
46    }
47}
48impl_plan_tree_node_for_unary! { StreamFsFetch }
49
50impl StreamFsFetch {
51    pub fn new(input: PlanRef, source: generic::Source) -> Self {
52        let base = PlanBase::new_stream_with_core(
53            &source,
54            Distribution::SomeShard,
55            source.catalog.as_ref().is_none_or(|s| s.append_only),
56            false,
57            WatermarkColumns::new(),
58            MonotonicityMap::new(), // TODO: derive monotonicity
59        );
60
61        Self {
62            base,
63            input,
64            core: source,
65        }
66    }
67
68    fn get_columns(&self) -> Vec<&str> {
69        self.core
70            .column_catalog
71            .iter()
72            .map(|column| column.name())
73            .collect()
74    }
75
76    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
77        self.core.catalog.clone()
78    }
79}
80
81impl Distill for StreamFsFetch {
82    fn distill<'a>(&self) -> XmlNode<'a> {
83        let columns = self
84            .get_columns()
85            .iter()
86            .map(|ele| Pretty::from(ele.to_string()))
87            .collect();
88        let col = Pretty::Array(columns);
89        childless_record("StreamFsFetch", vec![("columns", col)])
90    }
91}
92
93impl ExprRewritable for StreamFsFetch {}
94
95impl ExprVisitable for StreamFsFetch {}
96
97impl StreamNode for StreamFsFetch {
98    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
99        // `StreamFsFetch` is same as source in proto def, so the following code is the same as `StreamSource`
100        let source_catalog = self.source_catalog();
101
102        let source_inner = source_catalog.map(|source_catalog| {
103            let (with_properties, secret_refs) =
104                source_catalog.with_properties.clone().into_parts();
105            PbStreamFsFetch {
106                source_id: source_catalog.id,
107                source_name: source_catalog.name.clone(),
108                state_table: Some(
109                    generic::Source::infer_internal_table_catalog(true)
110                        .with_id(state.gen_table_id_wrapped())
111                        .to_internal_table_prost(),
112                ),
113                info: Some(source_catalog.info.clone()),
114                row_id_index: self.core.row_id_index.map(|index| index as _),
115                columns: self
116                    .core
117                    .column_catalog
118                    .iter()
119                    .map(|c| c.to_protobuf())
120                    .collect_vec(),
121                with_properties,
122                rate_limit: source_catalog.rate_limit,
123                secret_refs,
124            }
125        });
126        NodeBody::StreamFsFetch(Box::new(StreamFsFetchNode {
127            node_inner: source_inner,
128        }))
129    }
130}