risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs1use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::types::{JsonbVal, ScalarRef};
23use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
24use risingwave_connector::source::filesystem::OpendalFsSplit;
25use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
26use risingwave_connector::source::{
27 ConnectorProperties, SourceContext, SourceCtrlOpts, SourceMessage, SourceMeta, SplitMetaData,
28};
29use thiserror_ext::AsReport;
30use tokio::fs;
31
32use crate::common::rate_limit::limited_chunk_size;
33use crate::executor::prelude::*;
34use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
35use crate::executor::stream_reader::StreamReaderWithPause;
36use crate::task::LocalBarrierManager;
37
38const BATCH_SIZE: usize = 1000;
40
41pub struct BatchPosixFsFetchExecutor<S: StateStore> {
51 actor_ctx: ActorContextRef,
52
53 stream_source_core: Option<StreamSourceCore<S>>,
55
56 upstream: Option<Executor>,
58
59 rate_limit_rps: Option<u32>,
61
62 barrier_manager: LocalBarrierManager,
64
65 file_queue: VecDeque<(String, JsonbVal)>,
68}
69
70struct FileData {
72 chunks: Vec<StreamChunk>,
74
75 file_path: String,
77}
78
79impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
80 pub fn new(
81 actor_ctx: ActorContextRef,
82 stream_source_core: StreamSourceCore<S>,
83 upstream: Executor,
84 rate_limit_rps: Option<u32>,
85 barrier_manager: LocalBarrierManager,
86 ) -> Self {
87 Self {
88 actor_ctx,
89 stream_source_core: Some(stream_source_core),
90 upstream: Some(upstream),
91 rate_limit_rps,
92 barrier_manager,
93 file_queue: VecDeque::new(),
94 }
95 }
96
97 fn replace_with_new_batch_reader<const BIASED: bool>(
100 files_in_progress: &mut usize,
101 file_queue: &mut VecDeque<(String, JsonbVal)>,
102 stream: &mut StreamReaderWithPause<BIASED, FileData>,
103 properties: ConnectorProperties,
104 parser_config: ParserConfig,
105 source_ctx: Arc<SourceContext>,
106 ) -> StreamExecutorResult<()> {
107 let mut batch = Vec::with_capacity(BATCH_SIZE);
109
110 for _ in 0..BATCH_SIZE {
111 if let Some((_file_path, split_json)) = file_queue.pop_front() {
112 let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
113 batch.push(split);
114 } else {
115 break;
116 }
117 }
118
119 if batch.is_empty() {
120 stream.replace_data_stream(stream::pending().boxed());
122 } else {
123 *files_in_progress += batch.len();
124 let batch_reader =
125 Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
126 stream.replace_data_stream(batch_reader.boxed());
127 }
128
129 Ok(())
130 }
131
132 #[try_stream(ok = FileData, error = StreamExecutorError)]
134 async fn build_batched_stream_reader(
135 batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
136 properties: ConnectorProperties,
137 parser_config: ParserConfig,
138 source_ctx: Arc<SourceContext>,
139 ) {
140 let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
141 unreachable!()
142 };
143
144 let root_path = batch_posix_fs_properties.root.clone();
145
146 for split in batch {
147 let file_path = split.name.clone();
148 let full_path = Path::new(&root_path).join(&file_path);
149
150 let content = match fs::read(&full_path).await {
152 Ok(content) => content,
153 Err(e) => {
154 tracing::error!(
155 error = %e.as_report(),
156 file_path = %full_path.display(),
157 "Failed to read file"
158 );
159 continue;
160 }
161 };
162
163 if content.is_empty() {
164 yield FileData {
166 chunks: vec![],
167 file_path,
168 };
169 continue;
170 }
171
172 let mut chunks = vec![];
173
174 for line in content.lines() {
176 let line =
177 line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
178
179 let message = SourceMessage {
180 key: None,
181 payload: Some(line.as_bytes().to_vec()),
182 offset: "0".to_owned(),
183 split_id: split.id(),
184 meta: SourceMeta::Empty,
185 };
186
187 let parser =
190 ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
191 .await?;
192
193 let chunk_stream = parser
194 .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
195
196 #[for_await]
197 for chunk in chunk_stream {
198 chunks.push(chunk?);
199 }
200 }
201
202 yield FileData { chunks, file_path };
203 }
204 }
205
206 #[try_stream(ok = Message, error = StreamExecutorError)]
207 async fn into_stream(mut self) {
208 let mut upstream = self.upstream.take().unwrap().execute();
209 let barrier = expect_first_barrier(&mut upstream).await?;
210 let is_pause_on_startup = barrier.is_pause_on_startup();
211 yield Message::Barrier(barrier);
212
213 let mut core = self.stream_source_core.take().unwrap();
214
215 let source_desc_builder = core.source_desc_builder.take().unwrap();
217
218 let source_desc = source_desc_builder
219 .build()
220 .map_err(StreamExecutorError::connector_error)?;
221 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
222 else {
223 unreachable!("Partition and offset columns must be set.");
224 };
225
226 let properties = source_desc.source.config.clone();
227 let parser_config = ParserConfig {
228 common: CommonParserConfig {
229 rw_columns: source_desc.columns.clone(),
230 },
231 specific: source_desc.source.parser_config.clone(),
232 };
233
234 let mut files_in_progress: usize = 0;
235 let mut stream =
236 StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
237
238 if is_pause_on_startup {
239 stream.pause_stream();
240 }
241
242 let mut list_finished = false;
246 let mut is_refreshing = false;
247 let mut file_queue = self.file_queue;
248
249 let actor_ctx = self.actor_ctx.clone();
251 let barrier_manager = self.barrier_manager.clone();
252 let rate_limit_rps = &mut self.rate_limit_rps;
253
254 let source_ctx = Arc::new(SourceContext::new(
255 actor_ctx.id,
256 core.source_id,
257 actor_ctx.fragment_id,
258 core.source_name.clone(),
259 source_desc.metrics.clone(),
260 SourceCtrlOpts {
261 chunk_size: limited_chunk_size(*rate_limit_rps),
262 split_txn: rate_limit_rps.is_some(),
263 },
264 source_desc.source.config.clone(),
265 None,
266 ));
267
268 while let Some(msg) = stream.next().await {
269 match msg {
270 Err(e) => {
271 tracing::error!(error = %e.as_report(), "Fetch Error");
272 files_in_progress = 0;
273 }
274 Ok(msg) => match msg {
275 Either::Left(msg) => match msg {
277 Message::Barrier(barrier) => {
278 let need_rebuild_reader = false;
279
280 if let Some(mutation) = barrier.mutation.as_deref() {
281 match mutation {
282 Mutation::Pause => stream.pause_stream(),
283 Mutation::Resume => stream.resume_stream(),
284 Mutation::RefreshStart {
285 associated_source_id,
286 ..
287 } if associated_source_id.table_id()
288 == core.source_id.table_id() =>
289 {
290 tracing::info!(
291 ?barrier.epoch,
292 actor_id = actor_ctx.id,
293 source_id = %core.source_id,
294 queue_len = file_queue.len(),
295 files_in_progress,
296 "RefreshStart: clearing state and aborting workload"
297 );
298
299 file_queue.clear();
301 files_in_progress = 0;
302 list_finished = false;
303 is_refreshing = true;
304
305 stream.replace_data_stream(stream::pending().boxed());
307 }
308 Mutation::ListFinish {
309 associated_source_id,
310 } => {
311 if associated_source_id.table_id()
313 == core.source_id.table_id()
314 {
315 tracing::info!(
316 ?barrier.epoch,
317 actor_id = actor_ctx.id,
318 source_id = %core.source_id,
319 "received ListFinish mutation"
320 );
321 list_finished = true;
322 }
323 }
324 _ => (),
325 }
326 }
327
328 let epoch = barrier.epoch;
329
330 if files_in_progress == 0
338 && file_queue.is_empty()
339 && list_finished
340 && is_refreshing
341 {
342 tracing::info!(
343 ?epoch,
344 actor_id = actor_ctx.id,
345 source_id = %core.source_id,
346 "Reporting source load finished"
347 );
348 barrier_manager.report_source_load_finished(
349 epoch,
350 actor_ctx.id,
351 core.source_id.table_id(),
352 core.source_id.table_id(),
353 );
354 list_finished = false;
356 is_refreshing = false;
357 }
358
359 yield Message::Barrier(barrier);
361
362 if files_in_progress == 0 || need_rebuild_reader {
364 Self::replace_with_new_batch_reader(
365 &mut files_in_progress,
366 &mut file_queue,
367 &mut stream,
368 properties.clone(),
369 parser_config.clone(),
370 source_ctx.clone(),
371 )?;
372 }
373 }
374 Message::Chunk(chunk) => {
377 for row in chunk.data_chunk().rows() {
378 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
379 let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
380 file_queue.push_back((file_name, split));
381 }
382
383 tracing::debug!(
384 actor_id = actor_ctx.id,
385 queue_len = file_queue.len(),
386 "Added file assignments to queue"
387 );
388 }
389 Message::Watermark(_) => unreachable!(),
390 },
391 Either::Right(FileData { chunks, file_path }) => {
393 files_in_progress -= 1;
395 tracing::debug!(
396 file_path = ?file_path,
397 "Processed file"
398 );
399
400 for chunk in chunks {
402 let chunk = prune_additional_cols(
403 &chunk,
404 split_idx,
405 offset_idx,
406 &source_desc.columns,
407 );
408 yield Message::Chunk(chunk);
409 }
410 }
411 },
412 }
413 }
414 }
415}
416
417impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
418 fn execute(self: Box<Self>) -> BoxedMessageStream {
419 self.into_stream().boxed()
420 }
421}
422
423impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
424 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
425 if let Some(core) = &self.stream_source_core {
426 f.debug_struct("BatchPosixFsFetchExecutor")
427 .field("source_id", &core.source_id)
428 .field("column_ids", &core.column_ids)
429 .finish()
430 } else {
431 f.debug_struct("BatchPosixFsFetchExecutor").finish()
432 }
433 }
434}