risingwave_stream/executor/source/
fs_fetch_executor.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::ColumnId;
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::id::SourceId;
25use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
26use risingwave_common::types::ScalarRef;
27use risingwave_connector::source::filesystem::OpendalFsSplit;
28use risingwave_connector::source::filesystem::opendal_source::{
29 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
30};
31use risingwave_connector::source::reader::desc::SourceDesc;
32use risingwave_connector::source::{
33 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
34};
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{
39 SourceStateTableHandler, StreamSourceCore,
40 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
41 get_split_offset_mapping_from_chunk, prune_additional_cols,
42};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::prelude::*;
45use crate::executor::stream_reader::StreamReaderWithPause;
46
47const SPLIT_BATCH_SIZE: usize = 1000;
48
49type SplitBatch = Option<Vec<SplitImpl>>;
50
51pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
52 actor_ctx: ActorContextRef,
53
54 stream_source_core: Option<StreamSourceCore<S>>,
56
57 upstream: Option<Executor>,
59
60 rate_limit_rps: Option<u32>,
62
63 _marker: PhantomData<Src>,
64}
65
66impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
67 pub fn new(
68 actor_ctx: ActorContextRef,
69 stream_source_core: StreamSourceCore<S>,
70 upstream: Executor,
71 rate_limit_rps: Option<u32>,
72 ) -> Self {
73 Self {
74 actor_ctx,
75 stream_source_core: Some(stream_source_core),
76 upstream: Some(upstream),
77 rate_limit_rps,
78 _marker: PhantomData,
79 }
80 }
81
82 async fn replace_with_new_batch_reader<const BIASED: bool>(
83 splits_on_fetch: &mut usize,
84 state_store_handler: &SourceStateTableHandler<S>,
85 column_ids: Vec<ColumnId>,
86 source_ctx: SourceContext,
87 source_desc: &SourceDesc,
88 stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
89 rate_limit_rps: Option<u32>,
90 ) -> StreamExecutorResult<()> {
91 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
92 let state_table = state_store_handler.state_table();
93 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
94 let table_iter = state_table
95 .iter_with_vnode(
96 vnode,
97 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
98 PrefetchOptions::prefetch_for_small_range_scan(),
100 )
101 .await?;
102 pin_mut!(table_iter);
103 let properties = source_desc.source.config.clone();
104 while let Some(item) = table_iter.next().await {
105 let row = item?;
106 let split = match row.datum_at(1) {
107 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
108 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
109 let split: OpendalFsSplit<OpendalGcs> =
110 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
111 SplitImpl::from(split)
112 }
113 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
114 let split: OpendalFsSplit<OpendalS3> =
115 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
116 SplitImpl::from(split)
117 }
118 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
119 let split: OpendalFsSplit<OpendalAzblob> =
120 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
121 SplitImpl::from(split)
122 }
123 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
124 let split: OpendalFsSplit<OpendalPosixFs> =
125 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
126 SplitImpl::from(split)
127 }
128 _ => unreachable!(),
129 },
130 _ => unreachable!(),
131 };
132 batch.push(split);
133
134 if batch.len() >= SPLIT_BATCH_SIZE {
135 break 'vnodes;
136 }
137 }
138 }
139 if batch.is_empty() {
140 stream.replace_data_stream(stream::pending().boxed());
141 } else {
142 *splits_on_fetch += batch.len();
143
144 let mut merged_stream =
145 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
146 for split in batch {
150 let single_file_stream = Self::build_single_file_stream_reader(
151 column_ids.clone(),
152 source_ctx.clone(),
153 source_desc,
154 Some(vec![split]),
155 rate_limit_rps,
156 )
157 .await?
158 .map_err(StreamExecutorError::connector_error);
159 let single_file_stream = single_file_stream.map(|reader| reader);
160 merged_stream = merged_stream.chain(single_file_stream).boxed();
161 }
162
163 stream.replace_data_stream(merged_stream);
164 }
165
166 Ok(())
167 }
168
169 async fn build_single_file_stream_reader(
175 column_ids: Vec<ColumnId>,
176 source_ctx: SourceContext,
177 source_desc: &SourceDesc,
178 batch: SplitBatch,
179 rate_limit_rps: Option<u32>,
180 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
181 let (stream, _) = source_desc
182 .source
183 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
184 .await
185 .map_err(StreamExecutorError::connector_error)?;
186 let optional_stream: BoxStreamingFileSourceChunkStream = stream
187 .map(|item| item.map(Some))
188 .chain(stream::once(async { Ok(None) }))
189 .boxed();
190 Ok(
191 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
192 .boxed(),
193 )
194 }
195
196 fn build_source_ctx(
197 &self,
198 source_desc: &SourceDesc,
199 source_id: SourceId,
200 source_name: &str,
201 ) -> SourceContext {
202 SourceContext::new(
203 self.actor_ctx.id,
204 source_id,
205 self.actor_ctx.fragment_id,
206 source_name.to_owned(),
207 source_desc.metrics.clone(),
208 SourceCtrlOpts {
209 chunk_size: limited_chunk_size(self.rate_limit_rps),
210 split_txn: self.rate_limit_rps.is_some(), },
212 source_desc.source.config.clone(),
213 None,
214 )
215 }
216
217 #[try_stream(ok = Message, error = StreamExecutorError)]
218 async fn into_stream(mut self) {
219 let mut upstream = self.upstream.take().unwrap().execute();
220 let barrier = expect_first_barrier(&mut upstream).await?;
221 let first_epoch = barrier.epoch;
222 let is_pause_on_startup = barrier.is_pause_on_startup();
223 yield Message::Barrier(barrier);
224
225 let mut core = self.stream_source_core.take().unwrap();
226 let mut state_store_handler = core.split_state_store;
227
228 let source_desc_builder = core.source_desc_builder.take().unwrap();
230
231 let source_desc = source_desc_builder
232 .build()
233 .map_err(StreamExecutorError::connector_error)?;
234
235 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
237 else {
238 unreachable!("Partition and offset columns must be set.");
239 };
240 state_store_handler.init_epoch(first_epoch).await?;
242
243 let mut splits_on_fetch: usize = 0;
244 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
245 upstream,
246 stream::pending().boxed(),
247 );
248 if is_pause_on_startup {
249 stream.pause_stream();
250 }
251
252 Self::replace_with_new_batch_reader(
256 &mut splits_on_fetch,
257 &state_store_handler, core.column_ids.clone(),
259 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
260 &source_desc,
261 &mut stream,
262 self.rate_limit_rps,
263 )
264 .await?;
265 let mut reading_file: Option<Arc<str>> = None;
266
267 while let Some(msg) = stream.next().await {
268 match msg {
269 Err(e) => {
270 tracing::error!(
271 source_id = %core.source_id,
272 source_name = %core.source_name,
273 fragment_id = %self.actor_ctx.fragment_id,
274 error = %e.as_report(),
275 "Fetch Error"
276 );
277 GLOBAL_ERROR_METRICS.user_source_error.report([
278 "File source fetch error".to_owned(),
279 core.source_id.to_string(),
280 core.source_name.clone(),
281 self.actor_ctx.fragment_id.to_string(),
282 ]);
283 splits_on_fetch = 0;
284 reading_file = None;
285 }
286 Ok(msg) => {
287 match msg {
288 Either::Left(msg) => {
290 match msg {
291 Message::Barrier(barrier) => {
292 if let Some(mutation) = barrier.mutation.as_deref() {
293 match mutation {
294 Mutation::Pause => stream.pause_stream(),
295 Mutation::Resume => stream.resume_stream(),
296 Mutation::Throttle(fragment_to_apply) => {
297 if let Some(new_rate_limit) = fragment_to_apply
298 .get(&self.actor_ctx.fragment_id)
299 && *new_rate_limit != self.rate_limit_rps
300 {
301 tracing::info!(
302 "updating rate limit from {:?} to {:?}",
303 self.rate_limit_rps,
304 *new_rate_limit
305 );
306 self.rate_limit_rps = *new_rate_limit;
307 splits_on_fetch = 0;
308 reading_file = None;
309 }
310 }
311 _ => (),
312 }
313 }
314
315 let post_commit = state_store_handler
316 .commit_may_update_vnode_bitmap(barrier.epoch)
317 .await?;
318
319 let update_vnode_bitmap =
320 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
321 yield Message::Barrier(barrier);
323
324 if post_commit
325 .post_yield_barrier(update_vnode_bitmap)
326 .await?
327 .is_some()
328 {
329 splits_on_fetch = 0;
333 reading_file = None;
334 }
335
336 if splits_on_fetch == 0 {
337 Self::replace_with_new_batch_reader(
338 &mut splits_on_fetch,
339 &state_store_handler,
340 core.column_ids.clone(),
341 self.build_source_ctx(
342 &source_desc,
343 core.source_id,
344 &core.source_name,
345 ),
346 &source_desc,
347 &mut stream,
348 self.rate_limit_rps,
349 )
350 .await?;
351 }
352 }
353 Message::Chunk(chunk) => {
356 let file_assignment = chunk
358 .data_chunk()
359 .rows()
360 .filter_map(|row| {
361 let filename = row.datum_at(0).unwrap().into_utf8();
362 let size = row.datum_at(2).unwrap().into_int64();
363
364 if size > 0 {
365 Some(OpendalFsSplit::<Src>::new(
366 filename.to_owned(),
367 0,
368 size as usize,
369 ))
370 } else {
371 None
372 }
373 })
374 .collect();
375 state_store_handler.set_states(file_assignment).await?;
376 state_store_handler.try_flush().await?;
377 }
378 Message::Watermark(_) => unreachable!(),
379 }
380 }
381 Either::Right(optional_chunk) => match optional_chunk {
401 Some(chunk) => {
402 let mapping = get_split_offset_mapping_from_chunk(
403 &chunk, split_idx, offset_idx,
404 )
405 .unwrap();
406 debug_assert_eq!(mapping.len(), 1);
407 if let Some((split_id, offset)) = mapping.into_iter().next() {
408 reading_file = Some(split_id.clone());
409 let row = state_store_handler.get(&split_id).await?
410 .unwrap_or_else(|| {
411 panic!("The fs_split (file_name) {:?} should be in the state table.",
412 split_id)
413 });
414 let mut fs_split = match row.datum_at(1) {
415 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
416 OpendalFsSplit::<Src>::restore_from_json(
417 jsonb_ref.to_owned_scalar(),
418 )?
419 }
420 _ => unreachable!(),
421 };
422 fs_split.update_offset(offset)?;
423
424 state_store_handler
425 .set(&split_id, fs_split.encode_to_json())
426 .await?;
427 }
428 let chunk = prune_additional_cols(
429 &chunk,
430 &[split_idx, offset_idx],
431 &source_desc.columns,
432 );
433 yield Message::Chunk(chunk);
434 }
435 None => {
436 tracing::debug!("Deleting file: {:?}", reading_file);
437 if let Some(ref delete_file_name) = reading_file {
438 splits_on_fetch -= 1;
439 state_store_handler.delete(delete_file_name).await?;
440 }
441 }
442 },
443 }
444 }
445 }
446 }
447 }
448}
449
450impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
451 fn execute(self: Box<Self>) -> BoxedMessageStream {
452 self.into_stream().boxed()
453 }
454}
455
456impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
457 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
458 if let Some(core) = &self.stream_source_core {
459 f.debug_struct("FsFetchExecutor")
460 .field("source_id", &core.source_id)
461 .field("column_ids", &core.column_ids)
462 .finish()
463 } else {
464 f.debug_struct("FsFetchExecutor").finish()
465 }
466 }
467}