risingwave_batch_executors/executor/
gcs_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_gcs_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct GcsFileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 gcs_credential: String,
38 batch_size: usize,
39 schema: Schema,
40 identity: String,
41}
42
43impl Executor for GcsFileScanExecutor {
44 fn schema(&self) -> &risingwave_common::catalog::Schema {
45 &self.schema
46 }
47
48 fn identity(&self) -> &str {
49 &self.identity
50 }
51
52 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53 self.do_execute().boxed()
54 }
55}
56
57impl GcsFileScanExecutor {
58 pub fn new(
59 file_format: FileFormat,
60 file_location: Vec<String>,
61 gcs_credential: String,
62 batch_size: usize,
63 schema: Schema,
64 identity: String,
65 ) -> Self {
66 Self {
67 file_format,
68 file_location,
69 gcs_credential,
70 batch_size,
71 schema,
72 identity,
73 }
74 }
75
76 #[try_stream(ok = DataChunk, error = BatchError)]
77 async fn do_execute(self: Box<Self>) {
78 assert_eq!(self.file_format, FileFormat::Parquet);
79 for file in self.file_location {
80 let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?;
81 let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?;
82 let chunk_stream = read_parquet_file(
83 op,
84 file_name,
85 None,
86 None,
87 false,
88 self.batch_size,
89 0,
90 None,
91 None,
92 )
93 .await?;
94 #[for_await]
95 for stream_chunk in chunk_stream {
96 let stream_chunk = stream_chunk?;
97 let (data_chunk, _) = stream_chunk.into_parts();
98 yield data_chunk;
99 }
100 }
101 }
102}
103
104pub struct GcsFileScanExecutorBuilder {}
105
106impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder {
107 async fn new_boxed_executor(
108 source: &ExecutorBuilder<'_>,
109 _inputs: Vec<BoxedExecutor>,
110 ) -> crate::error::Result<BoxedExecutor> {
111 let file_scan_node = try_match_expand!(
112 source.plan_node().get_node_body().unwrap(),
113 NodeBody::GcsFileScan
114 )?;
115
116 Ok(Box::new(GcsFileScanExecutor::new(
117 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
118 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
119 file_scan_node::FileFormat::Unspecified => unreachable!(),
120 },
121 file_scan_node.file_location.clone(),
122 file_scan_node.credential.clone(),
123 source.context().get_config().developer.chunk_size,
124 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
125 source.plan_node().get_identity().clone(),
126 )))
127 }
128}