risingwave_stream/executor/source/
fs_fetch_executor.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::ColumnId;
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::id::SourceId;
25use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
26use risingwave_common::types::ScalarRef;
27use risingwave_connector::source::filesystem::OpendalFsSplit;
28use risingwave_connector::source::filesystem::opendal_source::{
29 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
30};
31use risingwave_connector::source::reader::desc::SourceDesc;
32use risingwave_connector::source::{
33 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
34};
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{
39 SourceStateTableHandler, StreamSourceCore,
40 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
41 get_split_offset_mapping_from_chunk, prune_additional_cols,
42};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::prelude::*;
45use crate::executor::stream_reader::StreamReaderWithPause;
46
47const SPLIT_BATCH_SIZE: usize = 1000;
48
49type SplitBatch = Option<Vec<SplitImpl>>;
50
51pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
52 actor_ctx: ActorContextRef,
53
54 stream_source_core: Option<StreamSourceCore<S>>,
56
57 upstream: Option<Executor>,
59
60 rate_limit_rps: Option<u32>,
62
63 _marker: PhantomData<Src>,
64}
65
66impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
67 pub fn new(
68 actor_ctx: ActorContextRef,
69 stream_source_core: StreamSourceCore<S>,
70 upstream: Executor,
71 rate_limit_rps: Option<u32>,
72 ) -> Self {
73 Self {
74 actor_ctx,
75 stream_source_core: Some(stream_source_core),
76 upstream: Some(upstream),
77 rate_limit_rps,
78 _marker: PhantomData,
79 }
80 }
81
82 async fn replace_with_new_batch_reader<const BIASED: bool>(
83 splits_on_fetch: &mut usize,
84 state_store_handler: &SourceStateTableHandler<S>,
85 column_ids: Vec<ColumnId>,
86 source_ctx: SourceContext,
87 source_desc: &SourceDesc,
88 stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
89 rate_limit_rps: Option<u32>,
90 ) -> StreamExecutorResult<()> {
91 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
92 let state_table = state_store_handler.state_table();
93 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
94 let table_iter = state_table
95 .iter_with_vnode(
96 vnode,
97 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
98 PrefetchOptions::prefetch_for_small_range_scan(),
100 )
101 .await?;
102 pin_mut!(table_iter);
103 let properties = source_desc.source.config.clone();
104 while let Some(item) = table_iter.next().await {
105 let row = item?;
106 let split = match row.datum_at(1) {
107 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
108 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
109 let split: OpendalFsSplit<OpendalGcs> =
110 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
111 SplitImpl::from(split)
112 }
113 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
114 let split: OpendalFsSplit<OpendalS3> =
115 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
116 SplitImpl::from(split)
117 }
118 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
119 let split: OpendalFsSplit<OpendalAzblob> =
120 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
121 SplitImpl::from(split)
122 }
123 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
124 let split: OpendalFsSplit<OpendalPosixFs> =
125 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
126 SplitImpl::from(split)
127 }
128 _ => unreachable!(),
129 },
130 _ => unreachable!(),
131 };
132 batch.push(split);
133
134 if batch.len() >= SPLIT_BATCH_SIZE {
135 break 'vnodes;
136 }
137 }
138 }
139 if batch.is_empty() {
140 stream.replace_data_stream(stream::pending().boxed());
141 } else {
142 *splits_on_fetch += batch.len();
143
144 let mut merged_stream =
145 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
146 for split in batch {
150 let single_file_stream = Self::build_single_file_stream_reader(
151 column_ids.clone(),
152 source_ctx.clone(),
153 source_desc,
154 Some(vec![split]),
155 rate_limit_rps,
156 )
157 .await?
158 .map_err(StreamExecutorError::connector_error);
159 let single_file_stream = single_file_stream.map(|reader| reader);
160 merged_stream = merged_stream.chain(single_file_stream).boxed();
161 }
162
163 stream.replace_data_stream(merged_stream);
164 }
165
166 Ok(())
167 }
168
169 async fn build_single_file_stream_reader(
175 column_ids: Vec<ColumnId>,
176 source_ctx: SourceContext,
177 source_desc: &SourceDesc,
178 batch: SplitBatch,
179 rate_limit_rps: Option<u32>,
180 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
181 let (stream, _) = source_desc
182 .source
183 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
184 .await
185 .map_err(StreamExecutorError::connector_error)?;
186 let optional_stream: BoxStreamingFileSourceChunkStream = stream
187 .map(|item| item.map(Some))
188 .chain(stream::once(async { Ok(None) }))
189 .boxed();
190 Ok(
191 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
192 .boxed(),
193 )
194 }
195
196 fn build_source_ctx(
197 &self,
198 source_desc: &SourceDesc,
199 source_id: SourceId,
200 source_name: &str,
201 ) -> SourceContext {
202 SourceContext::new(
203 self.actor_ctx.id,
204 source_id,
205 self.actor_ctx.fragment_id,
206 source_name.to_owned(),
207 source_desc.metrics.clone(),
208 SourceCtrlOpts {
209 chunk_size: limited_chunk_size(self.rate_limit_rps),
210 split_txn: self.rate_limit_rps.is_some(), },
212 source_desc.source.config.clone(),
213 None,
214 )
215 }
216
217 #[try_stream(ok = Message, error = StreamExecutorError)]
218 async fn into_stream(mut self) {
219 let mut upstream = self.upstream.take().unwrap().execute();
220 let barrier = expect_first_barrier(&mut upstream).await?;
221 let first_epoch = barrier.epoch;
222 let is_pause_on_startup = barrier.is_pause_on_startup();
223 yield Message::Barrier(barrier);
224
225 let mut core = self.stream_source_core.take().unwrap();
226 let mut state_store_handler = core.split_state_store;
227
228 let source_desc_builder = core.source_desc_builder.take().unwrap();
230
231 let source_desc = source_desc_builder
232 .build()
233 .map_err(StreamExecutorError::connector_error)?;
234
235 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
237 else {
238 unreachable!("Partition and offset columns must be set.");
239 };
240 state_store_handler.init_epoch(first_epoch).await?;
242
243 let mut splits_on_fetch: usize = 0;
244 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
245 upstream,
246 stream::pending().boxed(),
247 );
248 if is_pause_on_startup {
249 stream.pause_stream();
250 }
251
252 Self::replace_with_new_batch_reader(
256 &mut splits_on_fetch,
257 &state_store_handler, core.column_ids.clone(),
259 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
260 &source_desc,
261 &mut stream,
262 self.rate_limit_rps,
263 )
264 .await?;
265 let mut reading_file: Option<Arc<str>> = None;
266
267 while let Some(msg) = stream.next().await {
268 match msg {
269 Err(e) => {
270 tracing::error!(
271 source_id = %core.source_id,
272 source_name = %core.source_name,
273 fragment_id = %self.actor_ctx.fragment_id,
274 error = %e.as_report(),
275 "Fetch Error"
276 );
277 GLOBAL_ERROR_METRICS.user_source_error.report([
278 "File source fetch error".to_owned(),
279 core.source_id.to_string(),
280 core.source_name.clone(),
281 self.actor_ctx.fragment_id.to_string(),
282 ]);
283 splits_on_fetch = 0;
284 }
285 Ok(msg) => {
286 match msg {
287 Either::Left(msg) => {
289 match msg {
290 Message::Barrier(barrier) => {
291 if let Some(mutation) = barrier.mutation.as_deref() {
292 match mutation {
293 Mutation::Pause => stream.pause_stream(),
294 Mutation::Resume => stream.resume_stream(),
295 Mutation::Throttle(actor_to_apply) => {
296 if let Some(new_rate_limit) =
297 actor_to_apply.get(&self.actor_ctx.id)
298 && *new_rate_limit != self.rate_limit_rps
299 {
300 tracing::info!(
301 "updating rate limit from {:?} to {:?}",
302 self.rate_limit_rps,
303 *new_rate_limit
304 );
305 self.rate_limit_rps = *new_rate_limit;
306 splits_on_fetch = 0;
307 }
308 }
309 _ => (),
310 }
311 }
312
313 let post_commit = state_store_handler
314 .commit_may_update_vnode_bitmap(barrier.epoch)
315 .await?;
316
317 let update_vnode_bitmap =
318 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
319 yield Message::Barrier(barrier);
321
322 if let Some((_, cache_may_stale)) =
323 post_commit.post_yield_barrier(update_vnode_bitmap).await?
324 {
325 if cache_may_stale {
327 splits_on_fetch = 0;
328 }
329 }
330
331 if splits_on_fetch == 0 {
332 Self::replace_with_new_batch_reader(
333 &mut splits_on_fetch,
334 &state_store_handler,
335 core.column_ids.clone(),
336 self.build_source_ctx(
337 &source_desc,
338 core.source_id,
339 &core.source_name,
340 ),
341 &source_desc,
342 &mut stream,
343 self.rate_limit_rps,
344 )
345 .await?;
346 }
347 }
348 Message::Chunk(chunk) => {
351 let file_assignment = chunk
353 .data_chunk()
354 .rows()
355 .filter_map(|row| {
356 let filename = row.datum_at(0).unwrap().into_utf8();
357 let size = row.datum_at(2).unwrap().into_int64();
358
359 if size > 0 {
360 Some(OpendalFsSplit::<Src>::new(
361 filename.to_owned(),
362 0,
363 size as usize,
364 ))
365 } else {
366 None
367 }
368 })
369 .collect();
370 state_store_handler.set_states(file_assignment).await?;
371 state_store_handler.try_flush().await?;
372 }
373 Message::Watermark(_) => unreachable!(),
374 }
375 }
376 Either::Right(optional_chunk) => match optional_chunk {
396 Some(chunk) => {
397 let mapping = get_split_offset_mapping_from_chunk(
398 &chunk, split_idx, offset_idx,
399 )
400 .unwrap();
401 debug_assert_eq!(mapping.len(), 1);
402 if let Some((split_id, offset)) = mapping.into_iter().next() {
403 reading_file = Some(split_id.clone());
404 let row = state_store_handler.get(&split_id).await?
405 .unwrap_or_else(|| {
406 panic!("The fs_split (file_name) {:?} should be in the state table.",
407 split_id)
408 });
409 let mut fs_split = match row.datum_at(1) {
410 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
411 OpendalFsSplit::<Src>::restore_from_json(
412 jsonb_ref.to_owned_scalar(),
413 )?
414 }
415 _ => unreachable!(),
416 };
417 fs_split.update_offset(offset)?;
418
419 state_store_handler
420 .set(&split_id, fs_split.encode_to_json())
421 .await?;
422 }
423 let chunk = prune_additional_cols(
424 &chunk,
425 &[split_idx, offset_idx],
426 &source_desc.columns,
427 );
428 yield Message::Chunk(chunk);
429 }
430 None => {
431 tracing::debug!("Deleting file: {:?}", reading_file);
432 if let Some(ref delete_file_name) = reading_file {
433 splits_on_fetch -= 1;
434 state_store_handler.delete(delete_file_name).await?;
435 }
436 }
437 },
438 }
439 }
440 }
441 }
442 }
443}
444
445impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
446 fn execute(self: Box<Self>) -> BoxedMessageStream {
447 self.into_stream().boxed()
448 }
449}
450
451impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
452 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453 if let Some(core) = &self.stream_source_core {
454 f.debug_struct("FsFetchExecutor")
455 .field("source_id", &core.source_id)
456 .field("column_ids", &core.column_ids)
457 .finish()
458 } else {
459 f.debug_struct("FsFetchExecutor").finish()
460 }
461 }
462}