1use std::collections::{HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use either::Either;
22use futures::TryStreamExt;
23use futures::stream::{self, StreamExt};
24use futures_async_stream::try_stream;
25use pin_project::pin_project;
26use risingwave_common::catalog::ColumnId;
27use risingwave_common::hash::VnodeBitmapExt;
28use risingwave_common::id::SourceId;
29use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
30use risingwave_common::types::ScalarRef;
31use risingwave_connector::source::filesystem::OpendalFsSplit;
32use risingwave_connector::source::filesystem::opendal_source::{
33 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
34};
35use risingwave_connector::source::reader::desc::SourceDesc;
36use risingwave_connector::source::{
37 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
38};
39use risingwave_pb::common::ThrottleType;
40use risingwave_storage::store::PrefetchOptions;
41use thiserror_ext::AsReport;
42
43use super::{
44 SourceStateTableHandler, StreamSourceCore,
45 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
46 get_split_offset_mapping_from_chunk, prune_additional_cols,
47};
48use crate::common::rate_limit::limited_chunk_size;
49use crate::executor::prelude::*;
50use crate::executor::stream_reader::StreamReaderWithPause;
51
52const SPLIT_BATCH_SIZE: usize = 1000;
53const MAX_RETRIES_PER_SPLIT: u32 = 3;
54const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200);
55
56type SplitBatch = Option<Vec<SplitImpl>>;
57
58struct ReplaceReaderArgs<'a, S: StateStore, const BIASED: bool> {
59 splits_on_fetch: &'a mut usize,
60 state_store_handler: &'a SourceStateTableHandler<S>,
61 dirty_splits: &'a HashSet<Arc<str>>,
62 column_ids: Vec<ColumnId>,
63 source_ctx: SourceContext,
64 source_desc: &'a SourceDesc,
65 stream: &'a mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
66 rate_limit_rps: Option<u32>,
67 reading_file: Arc<Mutex<Option<Arc<str>>>>,
68}
69
70#[pin_project]
75struct SetReadingFileOnPoll<S> {
76 #[pin]
77 inner: S,
78 reading_file: Arc<Mutex<Option<Arc<str>>>>,
79 split_id: Arc<str>,
80 is_set: bool,
81}
82
83impl<S> SetReadingFileOnPoll<S> {
84 fn new(inner: S, reading_file: Arc<Mutex<Option<Arc<str>>>>, split_id: Arc<str>) -> Self {
85 Self {
86 inner,
87 reading_file,
88 split_id,
89 is_set: false,
90 }
91 }
92}
93
94impl<S> futures::Stream for SetReadingFileOnPoll<S>
95where
96 S: futures::Stream,
97{
98 type Item = S::Item;
99
100 fn poll_next(
101 self: std::pin::Pin<&mut Self>,
102 cx: &mut std::task::Context<'_>,
103 ) -> std::task::Poll<Option<Self::Item>> {
104 let this = self.project();
105 if !*this.is_set {
106 *this.reading_file.lock().expect("mutex poisoned") = Some(this.split_id.clone());
107 *this.is_set = true;
108 }
109 this.inner.poll_next(cx)
110 }
111}
112
113pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
114 actor_ctx: ActorContextRef,
115
116 stream_source_core: Option<StreamSourceCore<S>>,
118
119 upstream: Option<Executor>,
121
122 rate_limit_rps: Option<u32>,
124
125 _marker: PhantomData<Src>,
126}
127
128impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
129 pub fn new(
130 actor_ctx: ActorContextRef,
131 stream_source_core: StreamSourceCore<S>,
132 upstream: Executor,
133 rate_limit_rps: Option<u32>,
134 ) -> Self {
135 Self {
136 actor_ctx,
137 stream_source_core: Some(stream_source_core),
138 upstream: Some(upstream),
139 rate_limit_rps,
140 _marker: PhantomData,
141 }
142 }
143
144 async fn replace_with_new_batch_reader<const BIASED: bool>(
145 args: ReplaceReaderArgs<'_, S, BIASED>,
146 ) -> StreamExecutorResult<()> {
147 let ReplaceReaderArgs {
148 splits_on_fetch,
149 state_store_handler,
150 dirty_splits,
151 column_ids,
152 source_ctx,
153 source_desc,
154 stream,
155 rate_limit_rps,
156 reading_file,
157 } = args;
158 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
159 let state_table = state_store_handler.state_table();
160 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
161 let table_iter = state_table
162 .iter_with_vnode(
163 vnode,
164 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
165 PrefetchOptions::prefetch_for_small_range_scan(),
167 )
168 .await?;
169 pin_mut!(table_iter);
170 while let Some(item) = table_iter.next().await {
171 let row = item?;
172 let split = match row.datum_at(1) {
173 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match &source_desc.source.config {
174 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
175 let split: OpendalFsSplit<OpendalGcs> =
176 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
177 SplitImpl::from(split)
178 }
179 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
180 let split: OpendalFsSplit<OpendalS3> =
181 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
182 SplitImpl::from(split)
183 }
184 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
185 let split: OpendalFsSplit<OpendalAzblob> =
186 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
187 SplitImpl::from(split)
188 }
189 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
190 let split: OpendalFsSplit<OpendalPosixFs> =
191 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
192 SplitImpl::from(split)
193 }
194 _ => unreachable!(),
195 },
196 _ => unreachable!(),
197 };
198 let split_id = split.id();
199 if dirty_splits.contains(&split_id) {
200 continue;
201 }
202 batch.push(split);
203
204 if batch.len() >= SPLIT_BATCH_SIZE {
205 break 'vnodes;
206 }
207 }
208 }
209 if batch.is_empty() {
210 stream.replace_data_stream(stream::pending().boxed());
211 } else {
212 *splits_on_fetch += batch.len();
213
214 let mut merged_stream =
215 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
216 for split in batch {
220 let split_id = split.id();
221 let single_file_stream = Self::build_single_file_stream_reader(
222 column_ids.clone(),
223 source_ctx.clone(),
224 source_desc,
225 Some(vec![split]),
226 rate_limit_rps,
227 )
228 .await?
229 .map_err(StreamExecutorError::connector_error);
230 let single_file_stream =
231 SetReadingFileOnPoll::new(single_file_stream, reading_file.clone(), split_id)
232 .boxed();
233 merged_stream = merged_stream.chain(single_file_stream).boxed();
234 }
235
236 stream.replace_data_stream(merged_stream);
237 }
238
239 Ok(())
240 }
241
242 async fn build_single_file_stream_reader(
248 column_ids: Vec<ColumnId>,
249 source_ctx: SourceContext,
250 source_desc: &SourceDesc,
251 batch: SplitBatch,
252 rate_limit_rps: Option<u32>,
253 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
254 let (stream, _) = source_desc
255 .source
256 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
257 .await
258 .map_err(StreamExecutorError::connector_error)?;
259 let optional_stream: BoxStreamingFileSourceChunkStream = stream
260 .map(|item| item.map(Some))
261 .chain(stream::once(async { Ok(None) }))
262 .boxed();
263 Ok(
264 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
265 .boxed(),
266 )
267 }
268
269 fn build_source_ctx(
270 &self,
271 source_desc: &SourceDesc,
272 source_id: SourceId,
273 source_name: &str,
274 ) -> SourceContext {
275 SourceContext::new(
276 self.actor_ctx.id,
277 source_id,
278 self.actor_ctx.fragment_id,
279 source_name.to_owned(),
280 source_desc.metrics.clone(),
281 SourceCtrlOpts {
282 chunk_size: limited_chunk_size(self.rate_limit_rps),
283 split_txn: self.rate_limit_rps.is_some(), },
285 source_desc.source.config.clone(),
286 None,
287 )
288 }
289
290 #[try_stream(ok = Message, error = StreamExecutorError)]
291 async fn into_stream(mut self) {
292 let mut upstream = self.upstream.take().unwrap().execute();
293 let barrier = expect_first_barrier(&mut upstream).await?;
294 let first_epoch = barrier.epoch;
295 let is_pause_on_startup = barrier.is_pause_on_startup();
296 yield Message::Barrier(barrier);
297
298 let mut core = self.stream_source_core.take().unwrap();
299 let mut state_store_handler = core.split_state_store;
300
301 let source_desc_builder = core.source_desc_builder.take().unwrap();
303
304 let source_desc = source_desc_builder
305 .build()
306 .map_err(StreamExecutorError::connector_error)?;
307 let actor_id = self.actor_ctx.id.to_string();
308 let fragment_id = self.actor_ctx.fragment_id.to_string();
309 let source_id = core.source_id.to_string();
310 let source_name = core.source_name.clone();
311 let dirty_split_count_metrics = source_desc
312 .metrics
313 .file_source_dirty_split_count
314 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
315 let failed_split_count_metrics = source_desc
316 .metrics
317 .file_source_failed_split_count
318 .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
319 dirty_split_count_metrics.set(0);
320
321 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
323 else {
324 unreachable!("Partition and offset columns must be set.");
325 };
326 state_store_handler.init_epoch(first_epoch).await?;
328
329 let reading_file: Arc<Mutex<Option<Arc<str>>>> = Arc::new(Mutex::new(None));
330 let mut retry_counts: HashMap<Arc<str>, u32> = HashMap::new();
331 let mut dirty_splits: HashSet<Arc<str>> = HashSet::new();
332
333 let mut splits_on_fetch: usize = 0;
334 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
335 upstream,
336 stream::pending().boxed(),
337 );
338 if is_pause_on_startup {
339 stream.pause_stream();
340 }
341
342 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
346 splits_on_fetch: &mut splits_on_fetch,
347 state_store_handler: &state_store_handler,
348 dirty_splits: &dirty_splits,
349 column_ids: core.column_ids.clone(),
350 source_ctx: self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
351 source_desc: &source_desc,
352 stream: &mut stream,
353 rate_limit_rps: self.rate_limit_rps,
354 reading_file: reading_file.clone(),
355 })
356 .await?;
357
358 while let Some(msg) = stream.next().await {
359 match msg {
360 Err(e) => {
361 let cur_file = reading_file.lock().expect("mutex poisoned").clone();
362 let Some(split_id) = cur_file else {
363 tracing::error!(
364 source_id = %core.source_id,
365 source_name = %core.source_name,
366 fragment_id = %self.actor_ctx.fragment_id,
367 error = %e.as_report(),
368 "Fetch Error but failed to infer reading file; aborting actor"
369 );
370 return Err(e);
371 };
372
373 let retries_done = retry_counts.entry(split_id.clone()).or_insert(0);
374 if *retries_done < MAX_RETRIES_PER_SPLIT {
375 *retries_done = retries_done.saturating_add(1);
376 let backoff = RETRY_BASE_BACKOFF
377 .checked_mul(1u32 << (*retries_done - 1))
378 .unwrap_or(Duration::from_secs(60));
379 tracing::warn!(
380 source_id = %core.source_id,
381 source_name = %core.source_name,
382 fragment_id = %self.actor_ctx.fragment_id,
383 reading_file = %split_id,
384 retries_done = *retries_done,
385 max_retries = MAX_RETRIES_PER_SPLIT,
386 error = %e.as_report(),
387 "Fetch Error, retrying file split"
388 );
389 tokio::time::sleep(backoff).await;
390 } else {
391 dirty_splits.insert(split_id.clone());
393 dirty_split_count_metrics.set(dirty_splits.len() as i64);
394 failed_split_count_metrics.inc();
395 retry_counts.remove(&split_id);
396 tracing::error!(
397 source_id = %core.source_id,
398 source_name = %core.source_name,
399 fragment_id = %self.actor_ctx.fragment_id,
400 reading_file = %split_id,
401 max_retries = MAX_RETRIES_PER_SPLIT,
402 error = %e.as_report(),
403 "Fetch Error, exceeded max retries; marking split dirty and skipping"
404 );
405 GLOBAL_ERROR_METRICS.user_source_error.report([
406 "File source dirty split".to_owned(),
407 core.source_id.to_string(),
408 core.source_name.clone(),
409 self.actor_ctx.fragment_id.to_string(),
410 ]);
411 }
412
413 *reading_file.lock().expect("mutex poisoned") = None;
415 splits_on_fetch = 0;
416 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
417 splits_on_fetch: &mut splits_on_fetch,
418 state_store_handler: &state_store_handler,
419 dirty_splits: &dirty_splits,
420 column_ids: core.column_ids.clone(),
421 source_ctx: self.build_source_ctx(
422 &source_desc,
423 core.source_id,
424 &core.source_name,
425 ),
426 source_desc: &source_desc,
427 stream: &mut stream,
428 rate_limit_rps: self.rate_limit_rps,
429 reading_file: reading_file.clone(),
430 })
431 .await?;
432 continue;
433 }
434 Ok(msg) => {
435 match msg {
436 Either::Left(msg) => {
438 match msg {
439 Message::Barrier(barrier) => {
440 if let Some(mutation) = barrier.mutation.as_deref() {
441 match mutation {
442 Mutation::Pause => stream.pause_stream(),
443 Mutation::Resume => stream.resume_stream(),
444 Mutation::Throttle(fragment_to_apply) => {
445 if let Some(entry) = fragment_to_apply
446 .get(&self.actor_ctx.fragment_id)
447 && entry.throttle_type() == ThrottleType::Source
448 && entry.rate_limit != self.rate_limit_rps
449 {
450 tracing::info!(
451 "updating rate limit from {:?} to {:?}",
452 self.rate_limit_rps,
453 entry.rate_limit
454 );
455 self.rate_limit_rps = entry.rate_limit;
456 splits_on_fetch = 0;
457 *reading_file.lock().expect("mutex poisoned") =
458 None;
459 }
460 }
461 _ => (),
462 }
463 }
464
465 let post_commit = state_store_handler
466 .commit_may_update_vnode_bitmap(barrier.epoch)
467 .await?;
468
469 let update_vnode_bitmap =
470 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
471 yield Message::Barrier(barrier);
473
474 if post_commit
475 .post_yield_barrier(update_vnode_bitmap)
476 .await?
477 .is_some()
478 {
479 splits_on_fetch = 0;
483 *reading_file.lock().expect("mutex poisoned") = None;
484 }
485
486 if splits_on_fetch == 0 {
487 Self::replace_with_new_batch_reader(ReplaceReaderArgs {
488 splits_on_fetch: &mut splits_on_fetch,
489 state_store_handler: &state_store_handler,
490 dirty_splits: &dirty_splits,
491 column_ids: core.column_ids.clone(),
492 source_ctx: self.build_source_ctx(
493 &source_desc,
494 core.source_id,
495 &core.source_name,
496 ),
497 source_desc: &source_desc,
498 stream: &mut stream,
499 rate_limit_rps: self.rate_limit_rps,
500 reading_file: reading_file.clone(),
501 })
502 .await?;
503 }
504 }
505 Message::Chunk(chunk) => {
508 let file_assignment: Vec<OpendalFsSplit<Src>> = chunk
510 .data_chunk()
511 .rows()
512 .filter_map(|row| {
513 let filename = row.datum_at(0).unwrap().into_utf8();
514 let size = row.datum_at(2).unwrap().into_int64();
515
516 if size > 0 {
517 Some(OpendalFsSplit::<Src>::new(
518 filename.to_owned(),
519 0,
520 size as usize,
521 ))
522 } else {
523 None
524 }
525 })
526 .collect();
527
528 state_store_handler.set_states(file_assignment).await?;
529 state_store_handler.try_flush().await?;
530 }
531 Message::Watermark(_) => unreachable!(),
532 }
533 }
534 Either::Right(optional_chunk) => match optional_chunk {
554 Some(chunk) => {
555 let mapping = get_split_offset_mapping_from_chunk(
556 &chunk, split_idx, offset_idx,
557 )
558 .unwrap();
559 debug_assert_eq!(mapping.len(), 1);
560 if let Some((split_id, offset)) = mapping.into_iter().next() {
561 *reading_file.lock().expect("mutex poisoned") =
562 Some(split_id.clone());
563 retry_counts.remove(&split_id);
564 let row = state_store_handler.get(&split_id).await?
565 .unwrap_or_else(|| {
566 panic!("The fs_split (file_name) {:?} should be in the state table.",
567 split_id)
568 });
569 let mut fs_split = match row.datum_at(1) {
570 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
571 OpendalFsSplit::<Src>::restore_from_json(
572 jsonb_ref.to_owned_scalar(),
573 )?
574 }
575 _ => unreachable!(),
576 };
577 fs_split.update_offset(offset)?;
578
579 state_store_handler
580 .set(&split_id, fs_split.encode_to_json())
581 .await?;
582 }
583 let chunk = prune_additional_cols(
584 &chunk,
585 &[split_idx, offset_idx],
586 &source_desc.columns,
587 );
588 yield Message::Chunk(chunk);
589 }
590 None => {
591 let cur_file = reading_file.lock().expect("mutex poisoned").clone();
592 tracing::debug!("Deleting file: {:?}", cur_file);
593 if let Some(ref delete_file_name) = cur_file {
594 splits_on_fetch -= 1;
595 state_store_handler.delete(delete_file_name).await?;
596 retry_counts.remove(delete_file_name);
598 }
599 }
600 },
601 }
602 }
603 }
604 }
605 }
606}
607
608impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
609 fn execute(self: Box<Self>) -> BoxedMessageStream {
610 self.into_stream().boxed()
611 }
612}
613
614impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
615 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
616 if let Some(core) = &self.stream_source_core {
617 f.debug_struct("FsFetchExecutor")
618 .field("source_id", &core.source_id)
619 .field("column_ids", &core.column_ids)
620 .finish()
621 } else {
622 f.debug_struct("FsFetchExecutor").finish()
623 }
624 }
625}