risingwave_batch_executors/executor/
s3_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct S3FileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 s3_region: String,
38 s3_access_key: String,
39 s3_secret_key: String,
40 s3_endpoint: String,
41 batch_size: usize,
42 schema: Schema,
43 identity: String,
44}
45
46impl Executor for S3FileScanExecutor {
47 fn schema(&self) -> &risingwave_common::catalog::Schema {
48 &self.schema
49 }
50
51 fn identity(&self) -> &str {
52 &self.identity
53 }
54
55 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
56 self.do_execute().boxed()
57 }
58}
59
60impl S3FileScanExecutor {
61 pub fn new(
62 file_format: FileFormat,
63 file_location: Vec<String>,
64 s3_region: String,
65 s3_access_key: String,
66 s3_secret_key: String,
67 batch_size: usize,
68 schema: Schema,
69 identity: String,
70 s3_endpoint: String,
71 ) -> Self {
72 Self {
73 file_format,
74 file_location,
75 s3_region,
76 s3_access_key,
77 s3_secret_key,
78 s3_endpoint,
79 batch_size,
80 schema,
81 identity,
82 }
83 }
84
85 #[try_stream(ok = DataChunk, error = BatchError)]
86 async fn do_execute(self: Box<Self>) {
87 assert_eq!(self.file_format, FileFormat::Parquet);
88 for file in self.file_location {
89 let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?;
90 let op = new_s3_operator(
91 self.s3_region.clone(),
92 self.s3_access_key.clone(),
93 self.s3_secret_key.clone(),
94 bucket.clone(),
95 self.s3_endpoint.clone(),
96 )?;
97 let chunk_stream =
98 read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
99 .await?;
100 #[for_await]
101 for stream_chunk in chunk_stream {
102 let stream_chunk = stream_chunk?;
103 let (data_chunk, _) = stream_chunk.into_parts();
104 yield data_chunk;
105 }
106 }
107 }
108}
109
110pub struct FileScanExecutorBuilder {}
111
112impl BoxedExecutorBuilder for FileScanExecutorBuilder {
113 async fn new_boxed_executor(
114 source: &ExecutorBuilder<'_>,
115 _inputs: Vec<BoxedExecutor>,
116 ) -> crate::error::Result<BoxedExecutor> {
117 let file_scan_node = try_match_expand!(
118 source.plan_node().get_node_body().unwrap(),
119 NodeBody::FileScan
120 )?;
121
122 Ok(Box::new(S3FileScanExecutor::new(
123 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
124 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
125 file_scan_node::FileFormat::Unspecified => unreachable!(),
126 },
127 file_scan_node.file_location.clone(),
128 file_scan_node.s3_region.clone(),
129 file_scan_node.s3_access_key.clone(),
130 file_scan_node.s3_secret_key.clone(),
131 source.context().get_config().developer.chunk_size,
132 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
133 source.plan_node().get_identity().clone(),
134 file_scan_node.s3_endpoint.clone(),
135 )))
136 }
137}