risingwave_batch_executors/executor/
gcs_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_gcs_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct GcsFileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 gcs_credential: String,
38 batch_size: usize,
39 schema: Schema,
40 identity: String,
41}
42
43impl Executor for GcsFileScanExecutor {
44 fn schema(&self) -> &risingwave_common::catalog::Schema {
45 &self.schema
46 }
47
48 fn identity(&self) -> &str {
49 &self.identity
50 }
51
52 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53 self.do_execute().boxed()
54 }
55}
56
57impl GcsFileScanExecutor {
58 pub fn new(
59 file_format: FileFormat,
60 file_location: Vec<String>,
61 gcs_credential: String,
62 batch_size: usize,
63 schema: Schema,
64 identity: String,
65 ) -> Self {
66 Self {
67 file_format,
68 file_location,
69 gcs_credential,
70 batch_size,
71 schema,
72 identity,
73 }
74 }
75
76 #[try_stream(ok = DataChunk, error = BatchError)]
77 async fn do_execute(self: Box<Self>) {
78 assert_eq!(self.file_format, FileFormat::Parquet);
79 for file in self.file_location {
80 let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?;
81 let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?;
82 let chunk_stream =
83 read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
84 .await?;
85 #[for_await]
86 for stream_chunk in chunk_stream {
87 let stream_chunk = stream_chunk?;
88 let (data_chunk, _) = stream_chunk.into_parts();
89 yield data_chunk;
90 }
91 }
92 }
93}
94
95pub struct GcsFileScanExecutorBuilder {}
96
97impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder {
98 async fn new_boxed_executor(
99 source: &ExecutorBuilder<'_>,
100 _inputs: Vec<BoxedExecutor>,
101 ) -> crate::error::Result<BoxedExecutor> {
102 let file_scan_node = try_match_expand!(
103 source.plan_node().get_node_body().unwrap(),
104 NodeBody::GcsFileScan
105 )?;
106
107 Ok(Box::new(GcsFileScanExecutor::new(
108 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
109 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
110 file_scan_node::FileFormat::Unspecified => unreachable!(),
111 },
112 file_scan_node.file_location.clone(),
113 file_scan_node.credential.clone(),
114 source.context().get_config().developer.chunk_size,
115 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
116 source.plan_node().get_identity().clone(),
117 )))
118 }
119}