risingwave_frontend/optimizer/plan_node/
logical_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22    BatchFileScan, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
23    PredicatePushdown, ToBatch, ToStream, generic,
24};
25use crate::OptimizerContextRef;
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::optimizer::plan_node::{
30    ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
31    ToStreamContext,
32};
33use crate::utils::{ColIndexMapping, Condition};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalFileScan {
37    pub base: PlanBase<Logical>,
38    pub core: generic::FileScanBackend,
39}
40
41impl LogicalFileScan {
42    pub fn new_s3_logical_file_scan(
43        ctx: OptimizerContextRef,
44        schema: Schema,
45        file_format: String,
46        storage_type: String,
47        s3_region: String,
48        s3_access_key: String,
49        s3_secret_key: String,
50        file_location: Vec<String>,
51        s3_endpoint: String,
52    ) -> Self {
53        assert!("parquet".eq_ignore_ascii_case(&file_format));
54        assert!("s3".eq_ignore_ascii_case(&storage_type));
55        let storage_type = generic::StorageType::S3;
56
57        let core = generic::FileScanBackend::FileScan(generic::FileScan {
58            schema,
59            file_format: generic::FileFormat::Parquet,
60            storage_type,
61            s3_region,
62            s3_access_key,
63            s3_secret_key,
64            file_location,
65            ctx,
66            s3_endpoint,
67        });
68
69        let base = PlanBase::new_logical_with_core(&core);
70
71        LogicalFileScan { base, core }
72    }
73
74    pub fn new_gcs_logical_file_scan(
75        ctx: OptimizerContextRef,
76        schema: Schema,
77        file_format: String,
78        storage_type: String,
79        credential: String,
80        file_location: Vec<String>,
81    ) -> Self {
82        assert!("parquet".eq_ignore_ascii_case(&file_format));
83        assert!("gcs".eq_ignore_ascii_case(&storage_type));
84
85        let core = generic::FileScanBackend::GcsFileScan(generic::GcsFileScan {
86            schema,
87            file_format: generic::FileFormat::Parquet,
88            storage_type: generic::StorageType::Gcs,
89            credential,
90            file_location,
91            ctx,
92        });
93
94        let base = PlanBase::new_logical_with_core(&core);
95        LogicalFileScan { base, core }
96    }
97
98    pub fn new_azblob_logical_file_scan(
99        ctx: OptimizerContextRef,
100        schema: Schema,
101        file_format: String,
102        storage_type: String,
103        account_name: String,
104        account_key: String,
105        endpoint: String,
106        file_location: Vec<String>,
107    ) -> Self {
108        assert!("parquet".eq_ignore_ascii_case(&file_format));
109        assert!("azblob".eq_ignore_ascii_case(&storage_type));
110
111        let core = generic::FileScanBackend::AzblobFileScan(generic::AzblobFileScan {
112            schema,
113            file_format: generic::FileFormat::Parquet,
114            storage_type: generic::StorageType::Azblob,
115            account_name,
116            account_key,
117            endpoint,
118            file_location,
119            ctx,
120        });
121
122        let base = PlanBase::new_logical_with_core(&core);
123
124        LogicalFileScan { base, core }
125    }
126}
127
128impl_plan_tree_node_for_leaf! {LogicalFileScan}
129impl Distill for LogicalFileScan {
130    fn distill<'a>(&self) -> XmlNode<'a> {
131        let fields = vec![("columns", column_names_pretty(self.schema()))];
132        childless_record("LogicalFileScan", fields)
133    }
134}
135
136impl ColPrunable for LogicalFileScan {
137    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
138        LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
139    }
140}
141
142impl ExprRewritable for LogicalFileScan {}
143
144impl ExprVisitable for LogicalFileScan {}
145
146impl PredicatePushdown for LogicalFileScan {
147    fn predicate_pushdown(
148        &self,
149        predicate: Condition,
150        _ctx: &mut PredicatePushdownContext,
151    ) -> PlanRef {
152        // No pushdown.
153        LogicalFilter::create(self.clone().into(), predicate)
154    }
155}
156
157impl ToBatch for LogicalFileScan {
158    fn to_batch(&self) -> Result<PlanRef> {
159        Ok(BatchFileScan::new(self.core.clone()).into())
160    }
161}
162
163impl ToStream for LogicalFileScan {
164    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
165        bail!("file_scan function is not supported in streaming mode")
166    }
167
168    fn logical_rewrite_for_stream(
169        &self,
170        _ctx: &mut RewriteStreamContext,
171    ) -> Result<(PlanRef, ColIndexMapping)> {
172        bail!("file_scan function is not supported in streaming mode")
173    }
174}