risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs1use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, StreamExt};
22use futures_async_stream::try_stream;
23use risingwave_common::id::TableId;
24use risingwave_common::types::{JsonbVal, ScalarRef};
25use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
26use risingwave_connector::source::filesystem::OpendalFsSplit;
27use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
28use risingwave_connector::source::{
29 ConnectorProperties, SourceChunkStream, SourceContext, SourceCtrlOpts, SourceMessage,
30 SourceMessageEvent, SourceMeta, SourceReaderEvent, SplitMetaData,
31};
32use thiserror_ext::AsReport;
33use tokio::fs;
34
35use crate::common::rate_limit::limited_chunk_size;
36use crate::executor::prelude::*;
37use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
38use crate::executor::stream_reader::StreamReaderWithPause;
39use crate::task::LocalBarrierManager;
40
41const BATCH_SIZE: usize = 1000;
43
44fn into_data_chunk_stream(
45 stream: impl futures::Stream<Item = risingwave_connector::error::ConnectorResult<SourceReaderEvent>>
46 + Send
47 + 'static,
48) -> impl SourceChunkStream {
49 stream
50 .try_filter_map(|event| async move {
51 Ok(match event {
52 SourceReaderEvent::DataChunk(chunk) => Some(chunk),
53 SourceReaderEvent::SplitProgress(_) => None,
54 })
55 })
56 .boxed()
57}
58
59pub struct BatchPosixFsFetchExecutor<S: StateStore> {
69 actor_ctx: ActorContextRef,
70
71 stream_source_core: Option<StreamSourceCore<S>>,
73
74 upstream: Option<Executor>,
76
77 rate_limit_rps: Option<u32>,
79
80 barrier_manager: LocalBarrierManager,
82
83 file_queue: VecDeque<(String, JsonbVal)>,
86
87 associated_table_id: TableId,
89}
90
91struct FileData {
93 chunks: Vec<StreamChunk>,
95
96 file_path: String,
98}
99
100impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
101 pub fn new(
102 actor_ctx: ActorContextRef,
103 stream_source_core: StreamSourceCore<S>,
104 upstream: Executor,
105 rate_limit_rps: Option<u32>,
106 barrier_manager: LocalBarrierManager,
107 associated_table_id: Option<TableId>,
108 ) -> Self {
109 assert!(associated_table_id.is_some());
110 Self {
111 actor_ctx,
112 stream_source_core: Some(stream_source_core),
113 upstream: Some(upstream),
114 rate_limit_rps,
115 barrier_manager,
116 file_queue: VecDeque::new(),
117 associated_table_id: associated_table_id.unwrap(),
118 }
119 }
120
121 fn replace_with_new_batch_reader<const BIASED: bool>(
124 files_in_progress: &mut usize,
125 file_queue: &mut VecDeque<(String, JsonbVal)>,
126 stream: &mut StreamReaderWithPause<BIASED, FileData>,
127 properties: ConnectorProperties,
128 parser_config: ParserConfig,
129 source_ctx: Arc<SourceContext>,
130 ) -> StreamExecutorResult<()> {
131 let mut batch = Vec::with_capacity(BATCH_SIZE);
133
134 for _ in 0..BATCH_SIZE {
135 if let Some((_file_path, split_json)) = file_queue.pop_front() {
136 let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
137 batch.push(split);
138 } else {
139 break;
140 }
141 }
142
143 if batch.is_empty() {
144 stream.replace_data_stream(stream::pending().boxed());
146 } else {
147 *files_in_progress += batch.len();
148 let batch_reader =
149 Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
150 stream.replace_data_stream(batch_reader.boxed());
151 }
152
153 Ok(())
154 }
155
156 #[try_stream(ok = FileData, error = StreamExecutorError)]
158 async fn build_batched_stream_reader(
159 batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
160 properties: ConnectorProperties,
161 parser_config: ParserConfig,
162 source_ctx: Arc<SourceContext>,
163 ) {
164 let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
165 unreachable!()
166 };
167
168 let root_path = batch_posix_fs_properties.root.clone();
169
170 for split in batch {
171 let file_path = split.name.clone();
172 let full_path = Path::new(&root_path).join(&file_path);
173
174 let content = match fs::read(&full_path).await {
176 Ok(content) => content,
177 Err(e) => {
178 tracing::error!(
179 error = %e.as_report(),
180 file_path = %full_path.display(),
181 "Failed to read file"
182 );
183 continue;
184 }
185 };
186
187 if content.is_empty() {
188 yield FileData {
190 chunks: vec![],
191 file_path,
192 };
193 continue;
194 }
195
196 let mut chunks = vec![];
197
198 for line in content.lines() {
200 let line =
201 line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
202
203 let message = SourceMessage {
204 key: None,
205 payload: Some(line.as_bytes().to_vec()),
206 offset: "0".to_owned(),
207 split_id: split.id(),
208 meta: SourceMeta::Empty,
209 };
210
211 let parser =
214 ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
215 .await?;
216
217 let chunk_stream = into_data_chunk_stream(parser.parse_stream_with_events(
218 Box::pin(futures::stream::once(async {
219 Ok(SourceMessageEvent::Data(vec![message]))
220 })),
221 ));
222
223 #[for_await]
224 for chunk in chunk_stream {
225 chunks.push(chunk?);
226 }
227 }
228
229 yield FileData { chunks, file_path };
230 }
231 }
232
233 #[try_stream(ok = Message, error = StreamExecutorError)]
234 async fn into_stream(mut self) {
235 let mut upstream = self.upstream.take().unwrap().execute();
236 let barrier = expect_first_barrier(&mut upstream).await?;
237 let is_pause_on_startup = barrier.is_pause_on_startup();
238 yield Message::Barrier(barrier);
239
240 let mut core = self.stream_source_core.take().unwrap();
241
242 let source_desc_builder = core.source_desc_builder.take().unwrap();
244
245 let source_desc = source_desc_builder
246 .build()
247 .map_err(StreamExecutorError::connector_error)?;
248 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
249 else {
250 unreachable!("Partition and offset columns must be set.");
251 };
252
253 let properties = source_desc.source.config.clone();
254 let parser_config = ParserConfig {
255 common: CommonParserConfig {
256 rw_columns: source_desc.columns.clone(),
257 },
258 specific: source_desc.source.parser_config.clone(),
259 };
260
261 let mut files_in_progress: usize = 0;
262 let mut stream =
263 StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
264
265 if is_pause_on_startup {
266 stream.pause_stream();
267 }
268
269 let mut list_finished = false;
273 let mut is_refreshing = false;
274 let mut file_queue = self.file_queue;
275
276 let actor_ctx = self.actor_ctx.clone();
278 let barrier_manager = self.barrier_manager.clone();
279 let rate_limit_rps = &mut self.rate_limit_rps;
280
281 let source_ctx = Arc::new(SourceContext::new(
282 actor_ctx.id,
283 core.source_id,
284 actor_ctx.fragment_id,
285 core.source_name.clone(),
286 source_desc.metrics.clone(),
287 SourceCtrlOpts {
288 chunk_size: limited_chunk_size(*rate_limit_rps),
289 split_txn: rate_limit_rps.is_some(),
290 },
291 source_desc.source.config.clone(),
292 None,
293 ));
294
295 while let Some(msg) = stream.next().await {
296 match msg {
297 Err(e) => {
298 tracing::error!(error = %e.as_report(), "Fetch Error");
299 files_in_progress = 0;
300 }
301 Ok(msg) => match msg {
302 Either::Left(msg) => match msg {
304 Message::Barrier(barrier) => {
305 let need_rebuild_reader = false;
306
307 if let Some(mutation) = barrier.mutation.as_deref() {
308 match mutation {
309 Mutation::Pause => stream.pause_stream(),
310 Mutation::Resume => stream.resume_stream(),
311 Mutation::RefreshStart {
312 associated_source_id,
313 ..
314 } if associated_source_id.as_raw_id()
315 == core.source_id.as_raw_id() =>
316 {
317 tracing::info!(
318 ?barrier.epoch,
319 actor_id = %actor_ctx.id,
320 source_id = %core.source_id,
321 queue_len = file_queue.len(),
322 files_in_progress,
323 "RefreshStart: clearing state and aborting workload"
324 );
325
326 file_queue.clear();
328 files_in_progress = 0;
329 list_finished = false;
330 is_refreshing = true;
331
332 stream.replace_data_stream(stream::pending().boxed());
334 }
335 Mutation::ListFinish {
336 associated_source_id,
337 } => {
338 if associated_source_id.as_raw_id()
340 == core.source_id.as_raw_id()
341 {
342 tracing::info!(
343 ?barrier.epoch,
344 actor_id = %actor_ctx.id,
345 source_id = %core.source_id,
346 "received ListFinish mutation"
347 );
348 list_finished = true;
349 }
350 }
351 _ => (),
352 }
353 }
354
355 let epoch = barrier.epoch;
356
357 if files_in_progress == 0
365 && file_queue.is_empty()
366 && list_finished
367 && is_refreshing
368 && barrier.is_checkpoint()
369 {
370 tracing::info!(
371 ?epoch,
372 actor_id = %actor_ctx.id,
373 source_id = %core.source_id,
374 "Reporting source load finished"
375 );
376 barrier_manager.report_source_load_finished(
377 epoch,
378 actor_ctx.id,
379 self.associated_table_id,
380 core.source_id,
381 );
382 list_finished = false;
384 is_refreshing = false;
385 }
386
387 yield Message::Barrier(barrier);
389
390 if files_in_progress == 0 || need_rebuild_reader {
392 Self::replace_with_new_batch_reader(
393 &mut files_in_progress,
394 &mut file_queue,
395 &mut stream,
396 properties.clone(),
397 parser_config.clone(),
398 source_ctx.clone(),
399 )?;
400 }
401 }
402 Message::Chunk(chunk) => {
405 for row in chunk.data_chunk().rows() {
406 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
407 let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
408 file_queue.push_back((file_name, split));
409 }
410
411 tracing::debug!(
412 actor_id = %actor_ctx.id,
413 queue_len = file_queue.len(),
414 "Added file assignments to queue"
415 );
416 }
417 Message::Watermark(_) => unreachable!(),
418 },
419 Either::Right(FileData { chunks, file_path }) => {
421 files_in_progress -= 1;
423 tracing::debug!(
424 file_path = ?file_path,
425 "Processed file"
426 );
427
428 for chunk in chunks {
430 let chunk = prune_additional_cols(
431 &chunk,
432 &[split_idx, offset_idx],
433 &source_desc.columns,
434 );
435 yield Message::Chunk(chunk);
436 }
437 }
438 },
439 }
440 }
441 }
442}
443
444impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
445 fn execute(self: Box<Self>) -> BoxedMessageStream {
446 self.into_stream().boxed()
447 }
448}
449
450impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
451 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
452 if let Some(core) = &self.stream_source_core {
453 f.debug_struct("BatchPosixFsFetchExecutor")
454 .field("source_id", &core.source_id)
455 .field("column_ids", &core.column_ids)
456 .finish()
457 } else {
458 f.debug_struct("BatchPosixFsFetchExecutor").finish()
459 }
460 }
461}