risingwave_stream/executor/source/batch_source/
batch_posix_fs_fetch.rs1use std::collections::VecDeque;
16use std::io::BufRead;
17use std::path::Path;
18
19use either::Either;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::id::TableId;
23use risingwave_common::types::{JsonbVal, ScalarRef};
24use risingwave_connector::parser::{ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig};
25use risingwave_connector::source::filesystem::OpendalFsSplit;
26use risingwave_connector::source::filesystem::opendal_source::OpendalPosixFs;
27use risingwave_connector::source::{
28 ConnectorProperties, SourceContext, SourceCtrlOpts, SourceMessage, SourceMeta, SplitMetaData,
29};
30use thiserror_ext::AsReport;
31use tokio::fs;
32
33use crate::common::rate_limit::limited_chunk_size;
34use crate::executor::prelude::*;
35use crate::executor::source::{StreamSourceCore, get_split_offset_col_idx, prune_additional_cols};
36use crate::executor::stream_reader::StreamReaderWithPause;
37use crate::task::LocalBarrierManager;
38
39const BATCH_SIZE: usize = 1000;
41
42pub struct BatchPosixFsFetchExecutor<S: StateStore> {
52 actor_ctx: ActorContextRef,
53
54 stream_source_core: Option<StreamSourceCore<S>>,
56
57 upstream: Option<Executor>,
59
60 rate_limit_rps: Option<u32>,
62
63 barrier_manager: LocalBarrierManager,
65
66 file_queue: VecDeque<(String, JsonbVal)>,
69
70 associated_table_id: TableId,
72}
73
74struct FileData {
76 chunks: Vec<StreamChunk>,
78
79 file_path: String,
81}
82
83impl<S: StateStore> BatchPosixFsFetchExecutor<S> {
84 pub fn new(
85 actor_ctx: ActorContextRef,
86 stream_source_core: StreamSourceCore<S>,
87 upstream: Executor,
88 rate_limit_rps: Option<u32>,
89 barrier_manager: LocalBarrierManager,
90 associated_table_id: Option<TableId>,
91 ) -> Self {
92 assert!(associated_table_id.is_some());
93 Self {
94 actor_ctx,
95 stream_source_core: Some(stream_source_core),
96 upstream: Some(upstream),
97 rate_limit_rps,
98 barrier_manager,
99 file_queue: VecDeque::new(),
100 associated_table_id: associated_table_id.unwrap(),
101 }
102 }
103
104 fn replace_with_new_batch_reader<const BIASED: bool>(
107 files_in_progress: &mut usize,
108 file_queue: &mut VecDeque<(String, JsonbVal)>,
109 stream: &mut StreamReaderWithPause<BIASED, FileData>,
110 properties: ConnectorProperties,
111 parser_config: ParserConfig,
112 source_ctx: Arc<SourceContext>,
113 ) -> StreamExecutorResult<()> {
114 let mut batch = Vec::with_capacity(BATCH_SIZE);
116
117 for _ in 0..BATCH_SIZE {
118 if let Some((_file_path, split_json)) = file_queue.pop_front() {
119 let split = OpendalFsSplit::<OpendalPosixFs>::restore_from_json(split_json)?;
120 batch.push(split);
121 } else {
122 break;
123 }
124 }
125
126 if batch.is_empty() {
127 stream.replace_data_stream(stream::pending().boxed());
129 } else {
130 *files_in_progress += batch.len();
131 let batch_reader =
132 Self::build_batched_stream_reader(batch, properties, parser_config, source_ctx);
133 stream.replace_data_stream(batch_reader.boxed());
134 }
135
136 Ok(())
137 }
138
139 #[try_stream(ok = FileData, error = StreamExecutorError)]
141 async fn build_batched_stream_reader(
142 batch: Vec<OpendalFsSplit<OpendalPosixFs>>,
143 properties: ConnectorProperties,
144 parser_config: ParserConfig,
145 source_ctx: Arc<SourceContext>,
146 ) {
147 let ConnectorProperties::BatchPosixFs(batch_posix_fs_properties) = properties else {
148 unreachable!()
149 };
150
151 let root_path = batch_posix_fs_properties.root.clone();
152
153 for split in batch {
154 let file_path = split.name.clone();
155 let full_path = Path::new(&root_path).join(&file_path);
156
157 let content = match fs::read(&full_path).await {
159 Ok(content) => content,
160 Err(e) => {
161 tracing::error!(
162 error = %e.as_report(),
163 file_path = %full_path.display(),
164 "Failed to read file"
165 );
166 continue;
167 }
168 };
169
170 if content.is_empty() {
171 yield FileData {
173 chunks: vec![],
174 file_path,
175 };
176 continue;
177 }
178
179 let mut chunks = vec![];
180
181 for line in content.lines() {
183 let line =
184 line.map_err(|e| StreamExecutorError::connector_error(anyhow::Error::from(e)))?;
185
186 let message = SourceMessage {
187 key: None,
188 payload: Some(line.as_bytes().to_vec()),
189 offset: "0".to_owned(),
190 split_id: split.id(),
191 meta: SourceMeta::Empty,
192 };
193
194 let parser =
197 ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone())
198 .await?;
199
200 let chunk_stream = parser
201 .parse_stream(Box::pin(futures::stream::once(async { Ok(vec![message]) })));
202
203 #[for_await]
204 for chunk in chunk_stream {
205 chunks.push(chunk?);
206 }
207 }
208
209 yield FileData { chunks, file_path };
210 }
211 }
212
213 #[try_stream(ok = Message, error = StreamExecutorError)]
214 async fn into_stream(mut self) {
215 let mut upstream = self.upstream.take().unwrap().execute();
216 let barrier = expect_first_barrier(&mut upstream).await?;
217 let is_pause_on_startup = barrier.is_pause_on_startup();
218 yield Message::Barrier(barrier);
219
220 let mut core = self.stream_source_core.take().unwrap();
221
222 let source_desc_builder = core.source_desc_builder.take().unwrap();
224
225 let source_desc = source_desc_builder
226 .build()
227 .map_err(StreamExecutorError::connector_error)?;
228 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
229 else {
230 unreachable!("Partition and offset columns must be set.");
231 };
232
233 let properties = source_desc.source.config.clone();
234 let parser_config = ParserConfig {
235 common: CommonParserConfig {
236 rw_columns: source_desc.columns.clone(),
237 },
238 specific: source_desc.source.parser_config.clone(),
239 };
240
241 let mut files_in_progress: usize = 0;
242 let mut stream =
243 StreamReaderWithPause::<true, FileData>::new(upstream, stream::pending().boxed());
244
245 if is_pause_on_startup {
246 stream.pause_stream();
247 }
248
249 let mut list_finished = false;
253 let mut is_refreshing = false;
254 let mut file_queue = self.file_queue;
255
256 let actor_ctx = self.actor_ctx.clone();
258 let barrier_manager = self.barrier_manager.clone();
259 let rate_limit_rps = &mut self.rate_limit_rps;
260
261 let source_ctx = Arc::new(SourceContext::new(
262 actor_ctx.id,
263 core.source_id,
264 actor_ctx.fragment_id,
265 core.source_name.clone(),
266 source_desc.metrics.clone(),
267 SourceCtrlOpts {
268 chunk_size: limited_chunk_size(*rate_limit_rps),
269 split_txn: rate_limit_rps.is_some(),
270 },
271 source_desc.source.config.clone(),
272 None,
273 ));
274
275 while let Some(msg) = stream.next().await {
276 match msg {
277 Err(e) => {
278 tracing::error!(error = %e.as_report(), "Fetch Error");
279 files_in_progress = 0;
280 }
281 Ok(msg) => match msg {
282 Either::Left(msg) => match msg {
284 Message::Barrier(barrier) => {
285 let need_rebuild_reader = false;
286
287 if let Some(mutation) = barrier.mutation.as_deref() {
288 match mutation {
289 Mutation::Pause => stream.pause_stream(),
290 Mutation::Resume => stream.resume_stream(),
291 Mutation::RefreshStart {
292 associated_source_id,
293 ..
294 } if associated_source_id.as_raw_id()
295 == core.source_id.as_raw_id() =>
296 {
297 tracing::info!(
298 ?barrier.epoch,
299 actor_id = %actor_ctx.id,
300 source_id = %core.source_id,
301 queue_len = file_queue.len(),
302 files_in_progress,
303 "RefreshStart: clearing state and aborting workload"
304 );
305
306 file_queue.clear();
308 files_in_progress = 0;
309 list_finished = false;
310 is_refreshing = true;
311
312 stream.replace_data_stream(stream::pending().boxed());
314 }
315 Mutation::ListFinish {
316 associated_source_id,
317 } => {
318 if associated_source_id.as_raw_id()
320 == core.source_id.as_raw_id()
321 {
322 tracing::info!(
323 ?barrier.epoch,
324 actor_id = %actor_ctx.id,
325 source_id = %core.source_id,
326 "received ListFinish mutation"
327 );
328 list_finished = true;
329 }
330 }
331 _ => (),
332 }
333 }
334
335 let epoch = barrier.epoch;
336
337 if files_in_progress == 0
345 && file_queue.is_empty()
346 && list_finished
347 && is_refreshing
348 && barrier.is_checkpoint()
349 {
350 tracing::info!(
351 ?epoch,
352 actor_id = %actor_ctx.id,
353 source_id = %core.source_id,
354 "Reporting source load finished"
355 );
356 barrier_manager.report_source_load_finished(
357 epoch,
358 actor_ctx.id,
359 self.associated_table_id,
360 core.source_id,
361 );
362 list_finished = false;
364 is_refreshing = false;
365 }
366
367 yield Message::Barrier(barrier);
369
370 if files_in_progress == 0 || need_rebuild_reader {
372 Self::replace_with_new_batch_reader(
373 &mut files_in_progress,
374 &mut file_queue,
375 &mut stream,
376 properties.clone(),
377 parser_config.clone(),
378 source_ctx.clone(),
379 )?;
380 }
381 }
382 Message::Chunk(chunk) => {
385 for row in chunk.data_chunk().rows() {
386 let file_name = row.datum_at(0).unwrap().into_utf8().to_owned();
387 let split = row.datum_at(1).unwrap().into_jsonb().to_owned_scalar();
388 file_queue.push_back((file_name, split));
389 }
390
391 tracing::debug!(
392 actor_id = %actor_ctx.id,
393 queue_len = file_queue.len(),
394 "Added file assignments to queue"
395 );
396 }
397 Message::Watermark(_) => unreachable!(),
398 },
399 Either::Right(FileData { chunks, file_path }) => {
401 files_in_progress -= 1;
403 tracing::debug!(
404 file_path = ?file_path,
405 "Processed file"
406 );
407
408 for chunk in chunks {
410 let chunk = prune_additional_cols(
411 &chunk,
412 &[split_idx, offset_idx],
413 &source_desc.columns,
414 );
415 yield Message::Chunk(chunk);
416 }
417 }
418 },
419 }
420 }
421 }
422}
423
424impl<S: StateStore> Execute for BatchPosixFsFetchExecutor<S> {
425 fn execute(self: Box<Self>) -> BoxedMessageStream {
426 self.into_stream().boxed()
427 }
428}
429
430impl<S: StateStore> Debug for BatchPosixFsFetchExecutor<S> {
431 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
432 if let Some(core) = &self.stream_source_core {
433 f.debug_struct("BatchPosixFsFetchExecutor")
434 .field("source_id", &core.source_id)
435 .field("column_ids", &core.column_ids)
436 .finish()
437 } else {
438 f.debug_struct("BatchPosixFsFetchExecutor").finish()
439 }
440 }
441}