risingwave_batch_executors/executor/
s3_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct S3FileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 s3_region: String,
38 s3_access_key: String,
39 s3_secret_key: String,
40 s3_endpoint: String,
41 batch_size: usize,
42 schema: Schema,
43 identity: String,
44}
45
46impl Executor for S3FileScanExecutor {
47 fn schema(&self) -> &risingwave_common::catalog::Schema {
48 &self.schema
49 }
50
51 fn identity(&self) -> &str {
52 &self.identity
53 }
54
55 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
56 self.do_execute().boxed()
57 }
58}
59
60impl S3FileScanExecutor {
61 pub fn new(
62 file_format: FileFormat,
63 file_location: Vec<String>,
64 s3_region: String,
65 s3_access_key: String,
66 s3_secret_key: String,
67 batch_size: usize,
68 schema: Schema,
69 identity: String,
70 s3_endpoint: String,
71 ) -> Self {
72 Self {
73 file_format,
74 file_location,
75 s3_region,
76 s3_access_key,
77 s3_secret_key,
78 s3_endpoint,
79 batch_size,
80 schema,
81 identity,
82 }
83 }
84
85 #[try_stream(ok = DataChunk, error = BatchError)]
86 async fn do_execute(self: Box<Self>) {
87 assert_eq!(self.file_format, FileFormat::Parquet);
88 for file in self.file_location {
89 let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?;
90 let op = new_s3_operator(
91 self.s3_region.clone(),
92 self.s3_access_key.clone(),
93 self.s3_secret_key.clone(),
94 bucket.clone(),
95 self.s3_endpoint.clone(),
96 )?;
97 let chunk_stream = read_parquet_file(
98 op,
99 file_name,
100 None,
101 None,
102 false,
103 self.batch_size,
104 0,
105 None,
106 None,
107 )
108 .await?;
109 #[for_await]
110 for stream_chunk in chunk_stream {
111 let stream_chunk = stream_chunk?;
112 let (data_chunk, _) = stream_chunk.into_parts();
113 yield data_chunk;
114 }
115 }
116 }
117}
118
119pub struct FileScanExecutorBuilder {}
120
121impl BoxedExecutorBuilder for FileScanExecutorBuilder {
122 async fn new_boxed_executor(
123 source: &ExecutorBuilder<'_>,
124 _inputs: Vec<BoxedExecutor>,
125 ) -> crate::error::Result<BoxedExecutor> {
126 let file_scan_node = try_match_expand!(
127 source.plan_node().get_node_body().unwrap(),
128 NodeBody::FileScan
129 )?;
130
131 Ok(Box::new(S3FileScanExecutor::new(
132 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
133 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
134 file_scan_node::FileFormat::Unspecified => unreachable!(),
135 },
136 file_scan_node.file_location.clone(),
137 file_scan_node.s3_region.clone(),
138 file_scan_node.s3_access_key.clone(),
139 file_scan_node.s3_secret_key.clone(),
140 source.context().get_config().developer.chunk_size,
141 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
142 source.plan_node().get_identity().clone(),
143 file_scan_node.s3_endpoint.clone(),
144 )))
145 }
146}