risingwave_connector/source/filesystem/opendal_source/
batch_posix_fs_source.rs1use std::io::BufRead;
16use std::path::Path;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use futures::future::BoxFuture;
21use futures_async_stream::try_stream;
22use glob;
23use risingwave_common::array::StreamChunk;
24use risingwave_common::types::JsonbVal;
25use serde::{Deserialize, Serialize};
26use tokio::fs;
27
28use super::BatchPosixFsProperties;
29use crate::error::ConnectorResult;
30use crate::parser::{ByteStreamSourceParserImpl, ParserConfig};
31use crate::source::batch::BatchSourceSplit;
32use crate::source::{
33 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceMessage,
34 SourceMeta, SplitEnumerator, SplitId, SplitMetaData, SplitReader,
35};
36
37#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
43pub struct BatchPosixFsSplit {
44 pub file_path: String,
47 pub split_id: SplitId,
49 #[serde(skip)]
52 pub finished: bool,
53}
54
55impl SplitMetaData for BatchPosixFsSplit {
56 fn id(&self) -> SplitId {
57 self.split_id.clone()
58 }
59
60 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
61 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
62 }
63
64 fn encode_to_json(&self) -> JsonbVal {
65 serde_json::to_value(self.clone()).unwrap().into()
66 }
67
68 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
69 Ok(())
71 }
72}
73
74impl BatchSourceSplit for BatchPosixFsSplit {
75 fn finished(&self) -> bool {
76 self.finished
77 }
78
79 fn finish(&mut self) {
80 self.finished = true;
81 }
82
83 fn refresh(&mut self) {
84 self.finished = false;
85 }
86}
87
88impl BatchPosixFsSplit {
89 pub fn new(file_path: String, split_id: SplitId) -> Self {
90 Self {
91 file_path,
92 split_id,
93 finished: false,
94 }
95 }
96
97 pub fn mark_finished(&mut self) {
98 self.finished = true;
99 }
100}
101
102#[derive(Debug)]
104pub struct BatchPosixFsEnumerator {
105 properties: BatchPosixFsProperties,
106}
107
108#[async_trait]
109impl SplitEnumerator for BatchPosixFsEnumerator {
110 type Properties = BatchPosixFsProperties;
111 type Split = BatchPosixFsSplit;
112
113 async fn new(
114 properties: Self::Properties,
115 _context: SourceEnumeratorContextRef,
116 ) -> ConnectorResult<Self> {
117 Ok(Self { properties })
118 }
119
120 async fn list_splits(&mut self) -> ConnectorResult<Vec<BatchPosixFsSplit>> {
121 let root_path = Path::new(&self.properties.root);
122
123 if !root_path.exists() {
124 return Err(anyhow!("Root directory does not exist: {}", self.properties.root).into());
125 }
126
127 Ok(vec![BatchPosixFsSplit::new(
129 self.properties.root.clone(), "114514".into(), )])
132 }
133}
134
135#[derive(Debug)]
137pub struct BatchPosixFsReader {
138 properties: BatchPosixFsProperties,
139 splits: Vec<BatchPosixFsSplit>,
140 parser_config: ParserConfig,
141 source_ctx: SourceContextRef,
142}
143
144#[async_trait]
145impl SplitReader for BatchPosixFsReader {
146 type Properties = BatchPosixFsProperties;
147 type Split = BatchPosixFsSplit;
148
149 async fn new(
150 properties: Self::Properties,
151 splits: Vec<Self::Split>,
152 parser_config: ParserConfig,
153 source_ctx: SourceContextRef,
154 _columns: Option<Vec<Column>>,
155 ) -> ConnectorResult<Self> {
156 Ok(Self {
157 properties,
158 splits,
159 parser_config,
160 source_ctx,
161 })
162 }
163
164 fn into_stream(self) -> BoxSourceChunkStream {
165 self.into_stream_inner()
166 }
167}
168
169impl BatchPosixFsReader {
170 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
171 async fn into_stream_inner(self) {
172 for split in &self.splits {
173 let files = self.collect_files_for_split(split).await?;
174 tracing::debug!(?files, ?split, "BatchPosixFsReader: colleted files");
175
176 for file_path in files {
177 let full_path = Path::new(&self.properties.root).join(&file_path);
178
179 let content = fs::read(&full_path)
181 .await
182 .with_context(|| format!("Failed to read file: {}", full_path.display()))?;
183
184 if content.is_empty() {
185 continue;
186 }
187
188 for line in content.lines() {
190 let line = line?;
191 let message = SourceMessage {
193 key: None,
194 payload: Some(line.as_bytes().to_vec()),
195 offset: "0".to_owned(), split_id: split.id(),
197 meta: SourceMeta::Empty,
198 };
199
200 let parser = ByteStreamSourceParserImpl::create(
202 self.parser_config.clone(),
203 self.source_ctx.clone(),
204 )
205 .await?;
206 let chunk_stream = parser
207 .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
208
209 #[for_await]
210 for chunk in chunk_stream {
211 yield chunk?;
212 }
213 }
214 }
215 }
216
217 tracing::info!("BatchPosixFs has finished reading all files");
219 }
220
221 async fn collect_files_for_split(
222 &self,
223 _split: &BatchPosixFsSplit,
224 ) -> ConnectorResult<Vec<String>> {
225 let root_path = Path::new(&self.properties.root);
226 let mut files = Vec::new();
227
228 let pattern = self
229 .properties
230 .match_pattern
231 .as_ref()
232 .map(|p| glob::Pattern::new(p))
233 .transpose()
234 .with_context(|| {
235 format!("Invalid match_pattern: {:?}", self.properties.match_pattern)
236 })?;
237
238 Self::collect_files_recursive(root_path, root_path, &pattern, &mut files).await?;
239 Ok(files)
240 }
241
242 fn collect_files_recursive<'a>(
243 current_dir: &'a Path,
244 root_path: &'a Path,
245 pattern: &'a Option<glob::Pattern>,
246 files: &'a mut Vec<String>,
247 ) -> BoxFuture<'a, ConnectorResult<()>> {
248 Box::pin(async move {
249 let mut entries = fs::read_dir(current_dir)
250 .await
251 .with_context(|| format!("Failed to read directory: {}", current_dir.display()))?;
252
253 while let Some(entry) = entries.next_entry().await.with_context(|| {
254 format!(
255 "Failed to read directory entry in: {}",
256 current_dir.display()
257 )
258 })? {
259 let path = entry.path();
260
261 if path.is_dir() {
262 Self::collect_files_recursive(&path, root_path, pattern, files).await?;
264 } else if path.is_file() {
265 let relative_path = path.strip_prefix(root_path).with_context(|| {
266 format!("Failed to get relative path for: {}", path.display())
267 })?;
268
269 let relative_path_str = relative_path.to_string_lossy().to_string();
270
271 if let Some(pattern) = pattern {
273 if pattern.matches(&relative_path_str) {
274 files.push(relative_path_str);
275 }
276 } else {
277 files.push(relative_path_str);
278 }
279 }
280 }
281
282 Ok(())
283 })
284 }
285}