risingwave_frontend/optimizer/plan_node/
logical_file_scan.rs1use pretty_xmlish::XmlNode;
16use risingwave_common::bail;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchFileScan, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
23 PredicatePushdown, ToBatch, ToStream, generic,
24};
25use crate::OptimizerContextRef;
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::column_names_pretty;
29use crate::optimizer::plan_node::{
30 ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext,
31 ToStreamContext,
32};
33use crate::utils::{ColIndexMapping, Condition};
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalFileScan {
37 pub base: PlanBase<Logical>,
38 pub core: generic::FileScanBackend,
39}
40
41impl LogicalFileScan {
42 pub fn new_s3_logical_file_scan(
43 ctx: OptimizerContextRef,
44 schema: Schema,
45 file_format: String,
46 storage_type: String,
47 s3_region: String,
48 s3_access_key: String,
49 s3_secret_key: String,
50 file_location: Vec<String>,
51 s3_endpoint: String,
52 ) -> Self {
53 assert!("parquet".eq_ignore_ascii_case(&file_format));
54 assert!("s3".eq_ignore_ascii_case(&storage_type));
55 let storage_type = generic::StorageType::S3;
56
57 let core = generic::FileScanBackend::FileScan(generic::FileScan {
58 schema,
59 file_format: generic::FileFormat::Parquet,
60 storage_type,
61 s3_region,
62 s3_access_key,
63 s3_secret_key,
64 file_location,
65 ctx,
66 s3_endpoint,
67 });
68
69 let base = PlanBase::new_logical_with_core(&core);
70
71 LogicalFileScan { base, core }
72 }
73
74 pub fn new_gcs_logical_file_scan(
75 ctx: OptimizerContextRef,
76 schema: Schema,
77 file_format: String,
78 storage_type: String,
79 credential: String,
80 file_location: Vec<String>,
81 ) -> Self {
82 assert!("parquet".eq_ignore_ascii_case(&file_format));
83 assert!("gcs".eq_ignore_ascii_case(&storage_type));
84
85 let core = generic::FileScanBackend::GcsFileScan(generic::GcsFileScan {
86 schema,
87 file_format: generic::FileFormat::Parquet,
88 storage_type: generic::StorageType::Gcs,
89 credential,
90 file_location,
91 ctx,
92 });
93
94 let base = PlanBase::new_logical_with_core(&core);
95 LogicalFileScan { base, core }
96 }
97
98 pub fn new_azblob_logical_file_scan(
99 ctx: OptimizerContextRef,
100 schema: Schema,
101 file_format: String,
102 storage_type: String,
103 account_name: String,
104 account_key: String,
105 endpoint: String,
106 file_location: Vec<String>,
107 ) -> Self {
108 assert!("parquet".eq_ignore_ascii_case(&file_format));
109 assert!("azblob".eq_ignore_ascii_case(&storage_type));
110
111 let core = generic::FileScanBackend::AzblobFileScan(generic::AzblobFileScan {
112 schema,
113 file_format: generic::FileFormat::Parquet,
114 storage_type: generic::StorageType::Azblob,
115 account_name,
116 account_key,
117 endpoint,
118 file_location,
119 ctx,
120 });
121
122 let base = PlanBase::new_logical_with_core(&core);
123
124 LogicalFileScan { base, core }
125 }
126}
127
128impl_plan_tree_node_for_leaf! {LogicalFileScan}
129impl Distill for LogicalFileScan {
130 fn distill<'a>(&self) -> XmlNode<'a> {
131 let fields = vec![("columns", column_names_pretty(self.schema()))];
132 childless_record("LogicalFileScan", fields)
133 }
134}
135
136impl ColPrunable for LogicalFileScan {
137 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
138 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
139 }
140}
141
142impl ExprRewritable for LogicalFileScan {}
143
144impl ExprVisitable for LogicalFileScan {}
145
146impl PredicatePushdown for LogicalFileScan {
147 fn predicate_pushdown(
148 &self,
149 predicate: Condition,
150 _ctx: &mut PredicatePushdownContext,
151 ) -> PlanRef {
152 LogicalFilter::create(self.clone().into(), predicate)
154 }
155}
156
157impl ToBatch for LogicalFileScan {
158 fn to_batch(&self) -> Result<PlanRef> {
159 Ok(BatchFileScan::new(self.core.clone()).into())
160 }
161}
162
163impl ToStream for LogicalFileScan {
164 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
165 bail!("file_scan function is not supported in streaming mode")
166 }
167
168 fn logical_rewrite_for_stream(
169 &self,
170 _ctx: &mut RewriteStreamContext,
171 ) -> Result<(PlanRef, ColIndexMapping)> {
172 bail!("file_scan function is not supported in streaming mode")
173 }
174}