risingwave_frontend/optimizer/plan_node/
batch_file_scan.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::file_scan_node::{FileFormat, StorageType};
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::{AzblobFileScanNode, FileScanNode, GcsFileScanNode};
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record, column_names_pretty};
22use super::{
23 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
24 generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Distribution, Order};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchFileScan {
32 pub base: PlanBase<Batch>,
33 pub core: generic::FileScanBackend,
34}
35
36impl BatchFileScan {
37 pub fn new(core: generic::FileScanBackend) -> Self {
38 let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any());
39
40 Self { base, core }
41 }
42
43 pub fn column_names(&self) -> Vec<&str> {
44 self.schema().names_str()
45 }
46
47 pub fn clone_with_dist(&self) -> Self {
48 let base = self
49 .base
50 .clone_with_new_distribution(Distribution::SomeShard);
51 Self {
52 base,
53 core: self.core.clone(),
54 }
55 }
56}
57
58impl_plan_tree_node_for_leaf! { Batch, BatchFileScan }
59
60impl Distill for BatchFileScan {
61 fn distill<'a>(&self) -> XmlNode<'a> {
62 let fields = vec![("columns", column_names_pretty(self.schema()))];
63 childless_record("BatchFileScan", fields)
64 }
65}
66
67impl ToLocalBatch for BatchFileScan {
68 fn to_local(&self) -> Result<PlanRef> {
69 Ok(self.clone_with_dist().into())
70 }
71}
72
73impl ToDistributedBatch for BatchFileScan {
74 fn to_distributed(&self) -> Result<PlanRef> {
75 Ok(self.clone_with_dist().into())
76 }
77}
78
79impl ToBatchPb for BatchFileScan {
80 fn to_batch_prost_body(&self) -> NodeBody {
81 match &self.core {
82 generic::FileScanBackend::FileScan(file_scan) => NodeBody::FileScan(FileScanNode {
83 columns: file_scan
84 .columns()
85 .into_iter()
86 .map(|col| col.to_protobuf())
87 .collect(),
88 file_format: match file_scan.file_format {
89 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
90 },
91 storage_type: StorageType::S3 as i32,
92
93 s3_region: file_scan.s3_region.clone(),
94 s3_access_key: file_scan.s3_access_key.clone(),
95 s3_secret_key: file_scan.s3_secret_key.clone(),
96 file_location: file_scan.file_location.clone(),
97 s3_endpoint: file_scan.s3_endpoint.clone(),
98 }),
99 generic::FileScanBackend::GcsFileScan(gcs_file_scan) => {
100 NodeBody::GcsFileScan(GcsFileScanNode {
101 columns: gcs_file_scan
102 .columns()
103 .into_iter()
104 .map(|col| col.to_protobuf())
105 .collect(),
106 file_format: match gcs_file_scan.file_format {
107 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
108 },
109 credential: gcs_file_scan.credential.clone(),
110 file_location: gcs_file_scan.file_location.clone(),
111 })
112 }
113
114 generic::FileScanBackend::AzblobFileScan(azblob_file_scan) => {
115 NodeBody::AzblobFileScan(AzblobFileScanNode {
116 columns: azblob_file_scan
117 .columns()
118 .into_iter()
119 .map(|col| col.to_protobuf())
120 .collect(),
121 file_format: match azblob_file_scan.file_format {
122 generic::FileFormat::Parquet => FileFormat::Parquet as i32,
123 },
124 account_name: azblob_file_scan.account_name.clone(),
125 account_key: azblob_file_scan.account_key.clone(),
126 endpoint: azblob_file_scan.endpoint.clone(),
127 file_location: azblob_file_scan.file_location.clone(),
128 })
129 }
130 }
131 }
132}
133
134impl ExprRewritable<Batch> for BatchFileScan {}
135
136impl ExprVisitable for BatchFileScan {}