risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs1use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, StreamExt};
22use futures_async_stream::try_stream;
23use risingwave_common::id::TableId;
24use risingwave_common::types::{JsonbVal, ScalarRef};
25use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
26use risingwave_connector::source::filesystem::OpendalFsSplit;
27use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
28use risingwave_connector::source::{
29 ConnectorProperties, SourceChunkStream, SourceContext, SourceCtrlOpts, SourceMessage,
30 SourceMessageEvent, SourceMeta, SourceReaderEvent, SplitMetaData,
31};
32use thiserror_ext::AsReport;
33use tokio::fs;
34
35use crate::common::rate_limit::limited_chunk_size;
36use crate::executor::prelude::*;
37use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
38use crate::executor::stream_reader::StreamReaderWithPause;
39use crate::task::LocalBarrierManager;
40
41const BATCH_SIZE: usize = 1000;
43
44fn into_data_chunk_stream(
45 stream: impl futures::Stream<Item = risingwave_connector::error::ConnectorResult<SourceReaderEvent>>
46 + Send
47 + 'static,
48) -> impl SourceChunkStream {
49 stream
50 .try_filter_map(|event| async move {
51 Ok(match event {
52 SourceReaderEvent::DataChunk(chunk) => Some(chunk),
53 SourceReaderEvent::SplitProgress(_) => None,
54 })
55 })
56 .boxed()
57}
58
59pub struct BatchPosixFsFetchExecutor<S: StateStore> {
69 actor_ctx: ActorContextRef,
70
71 stream_source_core: Option<StreamSourceCore<S>>,
73
74 upstream: Option<Executor>,
76
77 rate_limit_rps: Option<u32>,
79
80 barrier_manager: LocalBarrierManager,
82
83 file_queue: VecDeque<(String, JsonbVal)>,
86
87 associated_table_id: TableId,
89}
90
91struct FileData {
93 chunks: Vec<StreamChunk>,
95
96 file_path: String,
98}
99
100impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
101 pub fn new(
102 actor_ctx: ActorContextRef,
103 stream_source_core: StreamSourceCore<S>,
104 upstream: Executor,
105 rate_limit_rps: Option<u32>,
106 barrier_manager: LocalBarrierManager,
107 associated_table_id: Option<TableId>,
108 ) -> Self {
109 assert!(associated_table_id.is_some());
110 Self {
111 actor_ctx,
112 stream_source_core: Some(stream_source_core),
113 upstream: Some(upstream),
114 rate_limit_rps,
115 barrier_manager,
116 file_queue: VecDeque::new(),
117 associated_table_id: associated_table_id.unwrap(),
118 }
119 }
120
121 fn replace_with_new_batch_reader<const BIASED: bool>(
124 files_in_progress: &mut usize,
125 file_queue: &mut VecDeque<(String, JsonbVal)>,
126 stream: &mut StreamReaderWithPause<BIASED, FileData>,
127 properties: ConnectorProperties,
128 parser_config: ParserConfig,
129 source_ctx: Arc<SourceContext>,
130 ) -> StreamExecutorResult<()> {
131 let mut batch = Vec::with_capacity(BATCH_SIZE);
133
134 for _ in 0..BATCH_SIZE {
135 if let Some((_file_path, split_json)) = file_queue.pop_front() {
136 let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
137 batch.push(split);
138 } else {
139 break;
140 }
141 }
142
143 if batch.is_empty() {
144 stream.replace_data_stream(stream::pending().boxed());
146 } else {
147 *files_in_progress += batch.len();
148 let batch_reader =
149 Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
150 stream.replace_data_stream(batch_reader.boxed());
151 }
152
153 Ok(())
154 }
155
156 #[try_stream(ok = FileData, error = StreamExecutorError)]
158 async fn build_batched_stream_reader(
159 batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
160 properties: ConnectorProperties,
161 parser_config: ParserConfig,
162 source_ctx: Arc<SourceContext>,
163 ) {
164 let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
165 unreachable!()
166 };
167
168 let root_path = batch_posix_fs_properties.root.clone();
169
170 for split in batch {
171 let file_path = split.name.clone();
172 let full_path = Path::new(&root_path).join(&file_path);
173
174 let content = match fs::read(&full_path).await {
176 Ok(content) => content,
177 Err(e) => {
178 tracing::error!(
179 error = %e.as_report(),
180 file_path = %full_path.display(),
181 "Failed to read file"
182 );
183 continue;
184 }
185 };
186
187 if content.is_empty() {
188 yield FileData {
190 chunks: vec![],
191 file_path,
192 };
193 continue;
194 }
195
196 let mut chunks = vec![];
197
198 for line in content.lines() {
200 let line =
201 line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
202
203 let message = SourceMessage {
204 key: None,
205 payload: Some(line.as_bytes().to_vec()),
206 offset: "0".to_owned(),
207 split_id: split.id(),
208 meta: SourceMeta::Empty,
209 };
210
211 let parser =
214 ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
215 .await?;
216
217 let chunk_stream = into_data_chunk_stream(parser.parse_stream_with_events(
218 Box::pin(futures::stream::once(async {
219 Ok(SourceMessageEvent::Data(vec![message]))
220 })),
221 ));
222
223 #[for_await]
224 for chunk in chunk_stream {
225 chunks.push(chunk?);
226 }
227 }
228
229 yield FileData { chunks, file_path };
230 }
231 }
232
233 #[try_stream(ok = Message, error = StreamExecutorError)]
234 async fn into_stream(mut self) {
235 let mut upstream = self.upstream.take().unwrap().execute();
236 let barrier = expect_first_barrier(&mut upstream).await?;
237 let is_pause_on_startup = barrier.is_pause_on_startup();
238 yield Message::Barrier(barrier);
239
240 let mut core = self.stream_source_core.take().unwrap();
241
242 let source_desc_builder = core.source_desc_builder.take().unwrap();
244
245 let source_desc = source_desc_builder
246 .build()
247 .map_err(StreamExecutorError::connector_error)?;
248 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
249 else {
250 unreachable!("Partition and offset columns must be set.");
251 };
252
253 let properties = source_desc.source.config.clone();
254 let parser_config = ParserConfig {
255 common: CommonParserConfig {
256 rw_columns: source_desc.columns.clone(),
257 },
258 specific: source_desc.source.parser_config.clone(),
259 };
260
261 let mut files_in_progress: usize = 0;
262 let mut stream =
263 StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
264
265 if is_pause_on_startup {
266 stream.pause_stream();
267 }
268
269 let mut list_finished = false;
273 let mut is_refreshing = false;
274 let mut file_queue = self.file_queue;
275
276 let actor_ctx = self.actor_ctx.clone();
278 let barrier_manager = self.barrier_manager.clone();
279 let rate_limit_rps = &mut self.rate_limit_rps;
280
281 let source_ctx = Arc::new(SourceContext::new(
282 actor_ctx.id,
283 core.source_id,
284 actor_ctx.fragment_id,
285 core.source_name.clone(),
286 source_desc.metrics.clone(),
287 SourceCtrlOpts {
288 chunk_size: limited_chunk_size(*rate_limit_rps),
289 split_txn: rate_limit_rps.is_some(),
290 },
291 source_desc.source.config.clone(),
292 None,
293 ));
294
295 while let Some(msg) = stream.next().await {
296 match msg {
297 Err(e) => {
298 tracing::error!(error = %e.as_report(), "Fetch Error");
299 files_in_progress = 0;
300 }
301 Ok(msg) => match msg {
302 Either::Left(msg) => match msg {
304 Message::Barrier(barrier) => {
305 let need_rebuild_reader = false;
306
307 if let Some(mutation) = barrier.mutation.as_deref() {
308 match mutation {
309 Mutation::Pause => stream.pause_stream(),
310 Mutation::Resume => stream.resume_stream(),
311 Mutation::RefreshStart {
312 associated_source_id,
313 ..
314 } if associated_source_id.as_raw_id()
315 == core.source_id.as_raw_id() =>
316 {
317 tracing::info!(
318 ?barrier.epoch,
319 actor_id = %actor_ctx.id,
320 source_id = %core.source_id,
321 queue_len = file_queue.len(),
322 files_in_progress,
323 "RefreshStart: clearing state and aborting workload"
324 );
325
326 file_queue.clear();
328 files_in_progress = 0;
329 list_finished = false;
330 is_refreshing = true;
331
332 stream.replace_data_stream(stream::pending().boxed());
334 }
335 Mutation::ListFinish {
336 associated_source_id,
337 } if associated_source_id.as_raw_id()
338 == core.source_id.as_raw_id() =>
339 {
340 tracing::info!(
342 ?barrier.epoch,
343 actor_id = %actor_ctx.id,
344 source_id = %core.source_id,
345 "received ListFinish mutation"
346 );
347 list_finished = true;
348 }
349 _ => (),
350 }
351 }
352
353 let epoch = barrier.epoch;
354
355 if files_in_progress == 0
363 && file_queue.is_empty()
364 && list_finished
365 && is_refreshing
366 && barrier.is_checkpoint()
367 {
368 tracing::info!(
369 ?epoch,
370 actor_id = %actor_ctx.id,
371 source_id = %core.source_id,
372 "Reporting source load finished"
373 );
374 barrier_manager.report_source_load_finished(
375 epoch,
376 actor_ctx.id,
377 self.associated_table_id,
378 core.source_id,
379 );
380 list_finished = false;
382 is_refreshing = false;
383 }
384
385 yield Message::Barrier(barrier);
387
388 if files_in_progress == 0 || need_rebuild_reader {
390 Self::replace_with_new_batch_reader(
391 &mut files_in_progress,
392 &mut file_queue,
393 &mut stream,
394 properties.clone(),
395 parser_config.clone(),
396 source_ctx.clone(),
397 )?;
398 }
399 }
400 Message::Chunk(chunk) => {
403 for row in chunk.data_chunk().rows() {
404 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
405 let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
406 file_queue.push_back((file_name, split));
407 }
408
409 tracing::debug!(
410 actor_id = %actor_ctx.id,
411 queue_len = file_queue.len(),
412 "Added file assignments to queue"
413 );
414 }
415 Message::Watermark(_) => unreachable!(),
416 },
417 Either::Right(FileData { chunks, file_path }) => {
419 files_in_progress -= 1;
421 tracing::debug!(
422 file_path = ?file_path,
423 "Processed file"
424 );
425
426 for chunk in chunks {
428 let chunk = prune_additional_cols(
429 &chunk,
430 &[split_idx, offset_idx],
431 &source_desc.columns,
432 );
433 yield Message::Chunk(chunk);
434 }
435 }
436 },
437 }
438 }
439 }
440}
441
442impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
443 fn execute(self: Box<Self>) -> BoxedMessageStream {
444 self.into_stream().boxed()
445 }
446}
447
448impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
449 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450 if let Some(core) = &self.stream_source_core {
451 f.debug_struct("BatchPosixFsFetchExecutor")
452 .field("source_id", &core.source_id)
453 .field("column_ids", &core.column_ids)
454 .finish()
455 } else {
456 f.debug_struct("BatchPosixFsFetchExecutor").finish()
457 }
458 }
459}