risingwave_frontend/optimizer/plan_node/
batch_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType};
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::{AzblobFileScanNode, FileScanNode, GcsFileScanNode};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record, column_names_pretty};
22use super::{
23    ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, Order};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchFileScan {
31    pub base: PlanBase<Batch>,
32    pub core: generic::FileScanBackend,
33}
34
35impl BatchFileScan {
36    pub fn new(core: generic::FileScanBackend) -> Self {
37        let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
38
39        Self { base, core }
40    }
41
42    pub fn column_names(&self) -> Vec<&str> {
43        self.schema().names_str()
44    }
45
46    pub fn clone_with_dist(&self) -> Self {
47        let base = self
48            .base
49            .clone_with_new_distribution(Distribution::SomeShard);
50        Self {
51            base,
52            core: self.core.clone(),
53        }
54    }
55}
56
57impl_plan_tree_node_for_leaf! { BatchFileScan }
58
59impl Distill for BatchFileScan {
60    fn distill<'a>(&self) -> XmlNode<'a> {
61        let fields = vec![("columns", column_names_pretty(self.schema()))];
62        childless_record("BatchFileScan", fields)
63    }
64}
65
66impl ToLocalBatch for BatchFileScan {
67    fn to_local(&self) -> Result<PlanRef> {
68        Ok(self.clone_with_dist().into())
69    }
70}
71
72impl ToDistributedBatch for BatchFileScan {
73    fn to_distributed(&self) -> Result<PlanRef> {
74        Ok(self.clone_with_dist().into())
75    }
76}
77
78impl ToBatchPb for BatchFileScan {
79    fn to_batch_prost_body(&self) -> NodeBody {
80        match &self.core {
81            generic::FileScanBackend::FileScan(file_scan) => NodeBody::FileScan(FileScanNode {
82                columns: file_scan
83                    .columns()
84                    .into_iter()
85                    .map(|col| col.to_protobuf())
86                    .collect(),
87                file_format: match file_scan.file_format {
88                    generic::FileFormat::Parquet => FileFormat::Parquet as i32,
89                },
90                storage_type: StorageType::S3 as i32,
91
92                s3_region: file_scan.s3_region.clone(),
93                s3_access_key: file_scan.s3_access_key.clone(),
94                s3_secret_key: file_scan.s3_secret_key.clone(),
95                file_location: file_scan.file_location.clone(),
96                s3_endpoint: file_scan.s3_endpoint.clone(),
97            }),
98            generic::FileScanBackend::GcsFileScan(gcs_file_scan) => {
99                NodeBody::GcsFileScan(GcsFileScanNode {
100                    columns: gcs_file_scan
101                        .columns()
102                        .into_iter()
103                        .map(|col| col.to_protobuf())
104                        .collect(),
105                    file_format: match gcs_file_scan.file_format {
106                        generic::FileFormat::Parquet => FileFormat::Parquet as i32,
107                    },
108                    credential: gcs_file_scan.credential.clone(),
109                    file_location: gcs_file_scan.file_location.clone(),
110                })
111            }
112
113            generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => {
114                NodeBody::AzblobFileScan(AzblobFileScanNode {
115                    columns: azblob_file_scan
116                        .columns()
117                        .into_iter()
118                        .map(|col| col.to_protobuf())
119                        .collect(),
120                    file_format: match azblob_file_scan.file_format {
121                        generic::FileFormat::Parquet => FileFormat::Parquet as i32,
122                    },
123                    account_name: azblob_file_scan.account_name.clone(),
124                    account_key: azblob_file_scan.account_key.clone(),
125                    endpoint: azblob_file_scan.endpoint.clone(),
126                    file_location: azblob_file_scan.file_location.clone(),
127                })
128            }
129        }
130    }
131}
132
133impl ExprRewritable for BatchFileScan {}
134
135impl ExprVisitable for BatchFileScan {}