risingwave_frontend/optimizer/plan_node/
batch_file_scan.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType};
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::{AzblobFileScanNode, FileScanNode, GcsFileScanNode};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record, column_names_pretty};
22use super::{
23 ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, generic,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, Order};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct BatchFileScan {
31 pub base: PlanBase<Batch>,
32 pub core: generic::FileScanBackend,
33}
34
35impl BatchFileScan {
36 pub fn new(core: generic::FileScanBackend) -> Self {
37 let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
38
39 Self { base, core }
40 }
41
42 pub fn column_names(&self) -> Vec<&str> {
43 self.schema().names_str()
44 }
45
46 pub fn clone_with_dist(&self) -> Self {
47 let base = self
48 .base
49 .clone_with_new_distribution(Distribution::SomeShard);
50 Self {
51 base,
52 core: self.core.clone(),
53 }
54 }
55}
56
57impl_plan_tree_node_for_leaf! { BatchFileScan }
58
59impl Distill for BatchFileScan {
60 fn distill<'a>(&self) -> XmlNode<'a> {
61 let fields = vec![("columns", column_names_pretty(self.schema()))];
62 childless_record("BatchFileScan", fields)
63 }
64}
65
66impl ToLocalBatch for BatchFileScan {
67 fn to_local(&self) -> Result<PlanRef> {
68 Ok(self.clone_with_dist().into())
69 }
70}
71
72impl ToDistributedBatch for BatchFileScan {
73 fn to_distributed(&self) -> Result<PlanRef> {
74 Ok(self.clone_with_dist().into())
75 }
76}
77
78impl ToBatchPb for BatchFileScan {
79 fn to_batch_prost_body(&self) -> NodeBody {
80 match &self.core {
81 generic::FileScanBackend::FileScan(file_scan) => NodeBody::FileScan(FileScanNode {
82 columns: file_scan
83 .columns()
84 .into_iter()
85 .map(|col| col.to_protobuf())
86 .collect(),
87 file_format: match file_scan.file_format {
88 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
89 },
90 storage_type: StorageType::S3 as i32,
91
92 s3_region: file_scan.s3_region.clone(),
93 s3_access_key: file_scan.s3_access_key.clone(),
94 s3_secret_key: file_scan.s3_secret_key.clone(),
95 file_location: file_scan.file_location.clone(),
96 s3_endpoint: file_scan.s3_endpoint.clone(),
97 }),
98 generic::FileScanBackend::GcsFileScan(gcs_file_scan) => {
99 NodeBody::GcsFileScan(GcsFileScanNode {
100 columns: gcs_file_scan
101 .columns()
102 .into_iter()
103 .map(|col| col.to_protobuf())
104 .collect(),
105 file_format: match gcs_file_scan.file_format {
106 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
107 },
108 credential: gcs_file_scan.credential.clone(),
109 file_location: gcs_file_scan.file_location.clone(),
110 })
111 }
112
113 generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => {
114 NodeBody::AzblobFileScan(AzblobFileScanNode {
115 columns: azblob_file_scan
116 .columns()
117 .into_iter()
118 .map(|col| col.to_protobuf())
119 .collect(),
120 file_format: match azblob_file_scan.file_format {
121 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
122 },
123 account_name: azblob_file_scan.account_name.clone(),
124 account_key: azblob_file_scan.account_key.clone(),
125 endpoint: azblob_file_scan.endpoint.clone(),
126 file_location: azblob_file_scan.file_location.clone(),
127 })
128 }
129 }
130 }
131}
132
133impl ExprRewritable for BatchFileScan {}
134
135impl ExprVisitable for BatchFileScan {}