1use std::collections::{HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use either::Either;
22use futures::TryStreamExt;
23use futures::stream::{self, StreamExt};
24use futures_async_stream::try_stream;
25use pin_project::pin_project;
26use risingwave_common::catalog::ColumnId;
27use risingwave_common::hash::VnodeBitmapExt;
28use risingwave_common::id::SourceId;
29use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
30use risingwave_common::types::ScalarRef;
31use risingwave_connector::source::filesystem::OpendalFsSplit;
32use risingwave_connector::source::filesystem::opendal_source::{
33 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
34};
35use risingwave_connector::source::reader::desc::SourceDesc;
36use risingwave_connector::source::{
37 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
38};
39use risingwave_pb::common::ThrottleType;
40use risingwave_storage::store::PrefetchOptions;
41use thiserror_ext::AsReport;
42
43use super::{
44 SourceStateTableHandler, StreamSourceCore,
45 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
46 get_split_offset_mapping_from_chunk, prune_additional_cols,
47 source_reader_event_to_chunk_stream,
48};
49use crate::common::rate_limit::limited_chunk_size;
50use crate::executor::prelude::*;
51use crate::executor::stream_reader::StreamReaderWithPause;
52
53const SPLIT_BATCH_SIZE: usize = 1000;
54const MAX_RETRIES_PER_SPLIT: u32 = 3;
55const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200);
56
57type SplitBatch = Option<Vec<SplitImpl>>;
58
59struct ReplaceReaderArgs<'a, S: StateStore, const BIASED: bool> {
60 splits_on_fetch: &'a mut usize,
61 state_store_handler: &'a SourceStateTableHandler<S>,
62 dirty_splits: &'a HashSet<Arc<str>>,
63 column_ids: Vec<ColumnId>,
64 source_ctx: SourceContext,
65 source_desc: &'a SourceDesc,
66 stream: &'a mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
67 rate_limit_rps: Option<u32>,
68 reading_file: Arc<Mutex<Option<Arc<str>>>>,
69}
70
71#[pin_project]
76struct SetReadingFileOnPoll<S> {
77 #[pin]
78 inner: S,
79 reading_file: Arc<Mutex<Option<Arc<str>>>>,
80 split_id: Arc<str>,
81 is_set: bool,
82}
83
84impl<S> SetReadingFileOnPoll<S> {
85 fn new(inner: S, reading_file: Arc<Mutex<Option<Arc<str>>>>, split_id: Arc<str>) -> Self {
86 Self {
87 inner,
88 reading_file,
89 split_id,
90 is_set: false,
91 }
92 }
93}
94
95impl<S> futures::Stream for SetReadingFileOnPoll<S>
96where
97 S: futures::Stream,
98{
99 type Item = S::Item;
100
101 fn poll_next(
102 self: std::pin::Pin<&mut Self>,
103 cx: &mut std::task::Context<'_>,
104 ) -> std::task::Poll<Option<Self::Item>> {
105 let this = self.project();
106 if !*this.is_set {
107 *this.reading_file.lock().expect("mutex poisoned") = Some(this.split_id.clone());
108 *this.is_set = true;
109 }
110 this.inner.poll_next(cx)
111 }
112}
113
114pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
115 actor_ctx: ActorContextRef,
116
117 stream_source_core: Option<StreamSourceCore<S>>,
119
120 upstream: Option<Executor>,
122
123 rate_limit_rps: Option<u32>,
125
126 _marker: PhantomData<Src>,
127}
128
129impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
130 pub fn new(
131 actor_ctx: ActorContextRef,
132 stream_source_core: StreamSourceCore<S>,
133 upstream: Executor,
134 rate_limit_rps: Option<u32>,
135 ) -> Self {
136 Self {
137 actor_ctx,
138 stream_source_core: Some(stream_source_core),
139 upstream: Some(upstream),
140 rate_limit_rps,
141 _marker: PhantomData,
142 }
143 }
144
145 async fn replace_with_new_batch_reader<const BIASED: bool>(
146 args: ReplaceReaderArgs<'_, S, BIASED>,
147 ) -> StreamExecutorResult<()> {
148 let ReplaceReaderArgs {
149 splits_on_fetch,
150 state_store_handler,
151 dirty_splits,
152 column_ids,
153 source_ctx,
154 source_desc,
155 stream,
156 rate_limit_rps,
157 reading_file,
158 } = args;
159 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
160 let state_table = state_store_handler.state_table();
161 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
162 let table_iter = state_table
163 .iter_with_vnode(
164 vnode,
165 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
166 PrefetchOptions::prefetch_for_small_range_scan(),
168 )
169 .await?;
170 pin_mut!(table_iter);
171 while let Some(item) = table_iter.next().await {
172 let row = item?;
173 let split = match row.datum_at(1) {
174 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match &source_desc.source.config {
175 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
176 let split: OpendalFsSplit<OpendalGcs> =
177 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
178 SplitImpl::from(split)
179 }
180 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
181 let split: OpendalFsSplit<OpendalS3> =
182 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
183 SplitImpl::from(split)
184 }
185 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
186 let split: OpendalFsSplit<OpendalAzblob> =
187 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
188 SplitImpl::from(split)
189 }
190 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
191 let split: OpendalFsSplit<OpendalPosixFs> =
192 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
193 SplitImpl::from(split)
194 }
195 _ => unreachable!(),
196 },
197 _ => unreachable!(),
198 };
199 let split_id = split.id();
200 if dirty_splits.contains(&split_id) {
201 continue;
202 }
203 batch.push(split);
204
205 if batch.len() >= SPLIT_BATCH_SIZE {
206 break 'vnodes;
207 }
208 }
209 }
210 if batch.is_empty() {
211 stream.replace_data_stream(stream::pending().boxed());
212 } else {
213 *splits_on_fetch += batch.len();
214
215 let mut merged_stream =
216 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
217 for split in batch {
221 let split_id = split.id();
222 let single_file_stream = Self::build_single_file_stream_reader(
223 column_ids.clone(),
224 source_ctx.clone(),
225 source_desc,
226 Some(vec![split]),
227 rate_limit_rps,
228 )
229 .await?
230 .map_err(StreamExecutorError::connector_error);
231 let single_file_stream =
232 SetReadingFileOnPoll::new(single_file_stream, reading_file.clone(), split_id)
233 .boxed();
234 merged_stream = merged_stream.chain(single_file_stream).boxed();
235 }
236
237 stream.replace_data_stream(merged_stream);
238 }
239
240 Ok(())
241 }
242
243 async fn build_single_file_stream_reader(
249 column_ids: Vec<ColumnId>,
250 source_ctx: SourceContext,
251 source_desc: &SourceDesc,
252 batch: SplitBatch,
253 rate_limit_rps: Option<u32>,
254 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
255 let (stream, _) = source_desc
256 .source
257 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
258 .await
259 .map_err(StreamExecutorError::connector_error)?;
260 let optional_stream: BoxStreamingFileSourceChunkStream =
261 source_reader_event_to_chunk_stream(stream)
262 .boxed()
263 .map(|item| item.map(Some))
264 .chain(stream::once(async { Ok(None) }))
265 .boxed();
266 Ok(
267 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
268 .boxed(),
269 )
270 }
271
272 fn build_source_ctx(
273 &self,
274 source_desc: &SourceDesc,
275 source_id: SourceId,
276 source_name: &str,
277 ) -> SourceContext {
278 SourceContext::new(
279 self.actor_ctx.id,
280 source_id,
281 self.actor_ctx.fragment_id,
282 source_name.to_owned(),
283 source_desc.metrics.clone(),
284 SourceCtrlOpts {
285 chunk_size: limited_chunk_size(self.rate_limit_rps),
286 split_txn: self.rate_limit_rps.is_some(), },
288 source_desc.source.config.clone(),
289 None,
290 )
291 }
292
293 #[try_stream(ok = Message, error = StreamExecutorError)]
294 async fn into_stream(mut self) {
295 let mut upstream = self.upstream.take().unwrap().execute();
296 let barrier = expect_first_barrier(&mut upstream).await?;
297 let first_epoch = barrier.epoch;
298 let is_pause_on_startup = barrier.is_pause_on_startup();
299 yield Message::Barrier(barrier);
300
301 let mut core = self.stream_source_core.take().unwrap();
302 let mut state_store_handler = core.split_state_store;
303
304 let source_desc_builder = core.source_desc_builder.take().unwrap();
306
307 let source_desc = source_desc_builder
308 .build()
309 .map_err(StreamExecutorError::connector_error)?;
310 let actor_id = self.actor_ctx.id.to_string();
311 let fragment_id = self.actor_ctx.fragment_id.to_string();
312 let source_id = core.source_id.to_string();
313 let source_name = core.source_name.clone();
314 let dirty_split_count_metrics = source_desc
315 .metrics
316 .file_source_dirty_split_count
317 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
318 let failed_split_count_metrics = source_desc
319 .metrics
320 .file_source_failed_split_count
321 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
322 dirty_split_count_metrics.set(0);
323
324 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
326 else {
327 unreachable!("Partition and offset columns must be set.");
328 };
329 state_store_handler.init_epoch(first_epoch).await?;
331
332 let reading_file: Arc<Mutex<Option<Arc<str>>>> = Arc::new(Mutex::new(None));
333 let mut retry_counts: HashMap<Arc<str>, u32> = HashMap::new();
334 let mut dirty_splits: HashSet<Arc<str>> = HashSet::new();
335
336 let mut splits_on_fetch: usize = 0;
337 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
338 upstream,
339 stream::pending().boxed(),
340 );
341 if is_pause_on_startup {
342 stream.pause_stream();
343 }
344
345 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
349 splits_on_fetch: &mut splits_on_fetch,
350 state_store_handler: &state_store_handler,
351 dirty_splits: &dirty_splits,
352 column_ids: core.column_ids.clone(),
353 source_ctx: self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
354 source_desc: &source_desc,
355 stream: &mut stream,
356 rate_limit_rps: self.rate_limit_rps,
357 reading_file: reading_file.clone(),
358 })
359 .await?;
360
361 while let Some(msg) = stream.next().await {
362 match msg {
363 Err(e) => {
364 let cur_file = reading_file.lock().expect("mutex poisoned").clone();
365 let Some(split_id) = cur_file else {
366 tracing::error!(
367 source_id = %core.source_id,
368 source_name = %core.source_name,
369 fragment_id = %self.actor_ctx.fragment_id,
370 error = %e.as_report(),
371 "Fetch Error but failed to infer reading file; aborting actor"
372 );
373 return Err(e);
374 };
375
376 let retries_done = retry_counts.entry(split_id.clone()).or_insert(0);
377 if *retries_done < MAX_RETRIES_PER_SPLIT {
378 *retries_done = retries_done.saturating_add(1);
379 let backoff = RETRY_BASE_BACKOFF
380 .checked_mul(1u32 << (*retries_done - 1))
381 .unwrap_or(Duration::from_secs(60));
382 tracing::warn!(
383 source_id = %core.source_id,
384 source_name = %core.source_name,
385 fragment_id = %self.actor_ctx.fragment_id,
386 reading_file = %split_id,
387 retries_done = *retries_done,
388 max_retries = MAX_RETRIES_PER_SPLIT,
389 error = %e.as_report(),
390 "Fetch Error, retrying file split"
391 );
392 tokio::time::sleep(backoff).await;
393 } else {
394 dirty_splits.insert(split_id.clone());
396 dirty_split_count_metrics.set(dirty_splits.len() as i64);
397 failed_split_count_metrics.inc();
398 retry_counts.remove(&split_id);
399 tracing::error!(
400 source_id = %core.source_id,
401 source_name = %core.source_name,
402 fragment_id = %self.actor_ctx.fragment_id,
403 reading_file = %split_id,
404 max_retries = MAX_RETRIES_PER_SPLIT,
405 error = %e.as_report(),
406 "Fetch Error, exceeded max retries; marking split dirty and skipping"
407 );
408 GLOBAL_ERROR_METRICS.user_source_error.report([
409 "File source dirty split".to_owned(),
410 core.source_id.to_string(),
411 core.source_name.clone(),
412 self.actor_ctx.fragment_id.to_string(),
413 ]);
414 }
415
416 *reading_file.lock().expect("mutex poisoned") = None;
418 splits_on_fetch = 0;
419 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
420 splits_on_fetch: &mut splits_on_fetch,
421 state_store_handler: &state_store_handler,
422 dirty_splits: &dirty_splits,
423 column_ids: core.column_ids.clone(),
424 source_ctx: self.build_source_ctx(
425 &source_desc,
426 core.source_id,
427 &core.source_name,
428 ),
429 source_desc: &source_desc,
430 stream: &mut stream,
431 rate_limit_rps: self.rate_limit_rps,
432 reading_file: reading_file.clone(),
433 })
434 .await?;
435 continue;
436 }
437 Ok(msg) => {
438 match msg {
439 Either::Left(msg) => {
441 match msg {
442 Message::Barrier(barrier) => {
443 if let Some(mutation) = barrier.mutation.as_deref() {
444 match mutation {
445 Mutation::Pause => stream.pause_stream(),
446 Mutation::Resume => stream.resume_stream(),
447 Mutation::Throttle(fragment_to_apply) => {
448 if let Some(entry) = fragment_to_apply
449 .get(&self.actor_ctx.fragment_id)
450 && entry.throttle_type() == ThrottleType::Source
451 && entry.rate_limit != self.rate_limit_rps
452 {
453 tracing::info!(
454 "updating rate limit from {:?} to {:?}",
455 self.rate_limit_rps,
456 entry.rate_limit
457 );
458 self.rate_limit_rps = entry.rate_limit;
459 splits_on_fetch = 0;
460 *reading_file.lock().expect("mutex poisoned") =
461 None;
462 }
463 }
464 _ => (),
465 }
466 }
467
468 let post_commit = state_store_handler
469 .commit_may_update_vnode_bitmap(barrier.epoch)
470 .await?;
471
472 let update_vnode_bitmap =
473 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
474 yield Message::Barrier(barrier);
476
477 if post_commit
478 .post_yield_barrier(update_vnode_bitmap)
479 .await?
480 .is_some()
481 {
482 splits_on_fetch = 0;
486 *reading_file.lock().expect("mutex poisoned") = None;
487 }
488
489 if splits_on_fetch == 0 {
490 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
491 splits_on_fetch: &mut splits_on_fetch,
492 state_store_handler: &state_store_handler,
493 dirty_splits: &dirty_splits,
494 column_ids: core.column_ids.clone(),
495 source_ctx: self.build_source_ctx(
496 &source_desc,
497 core.source_id,
498 &core.source_name,
499 ),
500 source_desc: &source_desc,
501 stream: &mut stream,
502 rate_limit_rps: self.rate_limit_rps,
503 reading_file: reading_file.clone(),
504 })
505 .await?;
506 }
507 }
508 Message::Chunk(chunk) => {
511 let file_assignment: Vec<OpendalFsSplit<Src>> = chunk
513 .data_chunk()
514 .rows()
515 .filter_map(|row| {
516 let filename = row.datum_at(0).unwrap().into_utf8();
517 let size = row.datum_at(2).unwrap().into_int64();
518
519 if size > 0 {
520 Some(OpendalFsSplit::<Src>::new(
521 filename.to_owned(),
522 0,
523 size as usize,
524 ))
525 } else {
526 None
527 }
528 })
529 .collect();
530
531 state_store_handler.set_states(file_assignment).await?;
532 state_store_handler.try_flush().await?;
533 }
534 Message::Watermark(_) => unreachable!(),
535 }
536 }
537 Either::Right(optional_chunk) => match optional_chunk {
557 Some(chunk) => {
558 let mapping = get_split_offset_mapping_from_chunk(
559 &chunk, split_idx, offset_idx,
560 )
561 .unwrap();
562 debug_assert_eq!(mapping.len(), 1);
563 if let Some((split_id, offset)) = mapping.into_iter().next() {
564 *reading_file.lock().expect("mutex poisoned") =
565 Some(split_id.clone());
566 retry_counts.remove(&split_id);
567 let row = state_store_handler.get(&split_id).await?
568 .unwrap_or_else(|| {
569 panic!("The fs_split (file_name) {:?} should be in the state table.",
570 split_id)
571 });
572 let mut fs_split = match row.datum_at(1) {
573 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
574 OpendalFsSplit::<Src>::restore_from_json(
575 jsonb_ref.to_owned_scalar(),
576 )?
577 }
578 _ => unreachable!(),
579 };
580 fs_split.update_offset(offset)?;
581
582 state_store_handler
583 .set(&split_id, fs_split.encode_to_json())
584 .await?;
585 }
586 let chunk = prune_additional_cols(
587 &chunk,
588 &[split_idx, offset_idx],
589 &source_desc.columns,
590 );
591 yield Message::Chunk(chunk);
592 }
593 None => {
594 let cur_file = reading_file.lock().expect("mutex poisoned").clone();
595 tracing::debug!("Deleting file: {:?}", cur_file);
596 if let Some(ref delete_file_name) = cur_file {
597 splits_on_fetch -= 1;
598 state_store_handler.delete(delete_file_name).await?;
599 retry_counts.remove(delete_file_name);
601 }
602 }
603 },
604 }
605 }
606 }
607 }
608 }
609}
610
611impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
612 fn execute(self: Box<Self>) -> BoxedMessageStream {
613 self.into_stream().boxed()
614 }
615}
616
617impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
618 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
619 if let Some(core) = &self.stream_source_core {
620 f.debug_struct("FsFetchExecutor")
621 .field("source_id", &core.source_id)
622 .field("column_ids", &core.column_ids)
623 .finish()
624 } else {
625 f.debug_struct("FsFetchExecutor").finish()
626 }
627 }
628}