risingwave_stream/executor/source/
fs_fetch_executor.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::ColumnId;
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::id::SourceId;
25use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
26use risingwave_common::types::ScalarRef;
27use risingwave_connector::source::filesystem::OpendalFsSplit;
28use risingwave_connector::source::filesystem::opendal_source::{
29 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
30};
31use risingwave_connector::source::reader::desc::SourceDesc;
32use risingwave_connector::source::{
33 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
34};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{
40 SourceStateTableHandler, StreamSourceCore,
41 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
42 get_split_offset_mapping_from_chunk, prune_additional_cols,
43};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::prelude::*;
46use crate::executor::stream_reader::StreamReaderWithPause;
47
48const SPLIT_BATCH_SIZE: usize = 1000;
49
50type SplitBatch = Option<Vec<SplitImpl>>;
51
52pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
53 actor_ctx: ActorContextRef,
54
55 stream_source_core: Option<StreamSourceCore<S>>,
57
58 upstream: Option<Executor>,
60
61 rate_limit_rps: Option<u32>,
63
64 _marker: PhantomData<Src>,
65}
66
67impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
68 pub fn new(
69 actor_ctx: ActorContextRef,
70 stream_source_core: StreamSourceCore<S>,
71 upstream: Executor,
72 rate_limit_rps: Option<u32>,
73 ) -> Self {
74 Self {
75 actor_ctx,
76 stream_source_core: Some(stream_source_core),
77 upstream: Some(upstream),
78 rate_limit_rps,
79 _marker: PhantomData,
80 }
81 }
82
83 async fn replace_with_new_batch_reader<const BIASED: bool>(
84 splits_on_fetch: &mut usize,
85 state_store_handler: &SourceStateTableHandler<S>,
86 column_ids: Vec<ColumnId>,
87 source_ctx: SourceContext,
88 source_desc: &SourceDesc,
89 stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
90 rate_limit_rps: Option<u32>,
91 ) -> StreamExecutorResult<()> {
92 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
93 let state_table = state_store_handler.state_table();
94 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
95 let table_iter = state_table
96 .iter_with_vnode(
97 vnode,
98 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
99 PrefetchOptions::prefetch_for_small_range_scan(),
101 )
102 .await?;
103 pin_mut!(table_iter);
104 let properties = source_desc.source.config.clone();
105 while let Some(item) = table_iter.next().await {
106 let row = item?;
107 let split = match row.datum_at(1) {
108 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
109 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
110 let split: OpendalFsSplit<OpendalGcs> =
111 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
112 SplitImpl::from(split)
113 }
114 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
115 let split: OpendalFsSplit<OpendalS3> =
116 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
117 SplitImpl::from(split)
118 }
119 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
120 let split: OpendalFsSplit<OpendalAzblob> =
121 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
122 SplitImpl::from(split)
123 }
124 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
125 let split: OpendalFsSplit<OpendalPosixFs> =
126 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
127 SplitImpl::from(split)
128 }
129 _ => unreachable!(),
130 },
131 _ => unreachable!(),
132 };
133 batch.push(split);
134
135 if batch.len() >= SPLIT_BATCH_SIZE {
136 break 'vnodes;
137 }
138 }
139 }
140 if batch.is_empty() {
141 stream.replace_data_stream(stream::pending().boxed());
142 } else {
143 *splits_on_fetch += batch.len();
144
145 let mut merged_stream =
146 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
147 for split in batch {
151 let single_file_stream = Self::build_single_file_stream_reader(
152 column_ids.clone(),
153 source_ctx.clone(),
154 source_desc,
155 Some(vec![split]),
156 rate_limit_rps,
157 )
158 .await?
159 .map_err(StreamExecutorError::connector_error);
160 let single_file_stream = single_file_stream.map(|reader| reader);
161 merged_stream = merged_stream.chain(single_file_stream).boxed();
162 }
163
164 stream.replace_data_stream(merged_stream);
165 }
166
167 Ok(())
168 }
169
170 async fn build_single_file_stream_reader(
176 column_ids: Vec<ColumnId>,
177 source_ctx: SourceContext,
178 source_desc: &SourceDesc,
179 batch: SplitBatch,
180 rate_limit_rps: Option<u32>,
181 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
182 let (stream, _) = source_desc
183 .source
184 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
185 .await
186 .map_err(StreamExecutorError::connector_error)?;
187 let optional_stream: BoxStreamingFileSourceChunkStream = stream
188 .map(|item| item.map(Some))
189 .chain(stream::once(async { Ok(None) }))
190 .boxed();
191 Ok(
192 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
193 .boxed(),
194 )
195 }
196
197 fn build_source_ctx(
198 &self,
199 source_desc: &SourceDesc,
200 source_id: SourceId,
201 source_name: &str,
202 ) -> SourceContext {
203 SourceContext::new(
204 self.actor_ctx.id,
205 source_id,
206 self.actor_ctx.fragment_id,
207 source_name.to_owned(),
208 source_desc.metrics.clone(),
209 SourceCtrlOpts {
210 chunk_size: limited_chunk_size(self.rate_limit_rps),
211 split_txn: self.rate_limit_rps.is_some(), },
213 source_desc.source.config.clone(),
214 None,
215 )
216 }
217
218 #[try_stream(ok = Message, error = StreamExecutorError)]
219 async fn into_stream(mut self) {
220 let mut upstream = self.upstream.take().unwrap().execute();
221 let barrier = expect_first_barrier(&mut upstream).await?;
222 let first_epoch = barrier.epoch;
223 let is_pause_on_startup = barrier.is_pause_on_startup();
224 yield Message::Barrier(barrier);
225
226 let mut core = self.stream_source_core.take().unwrap();
227 let mut state_store_handler = core.split_state_store;
228
229 let source_desc_builder = core.source_desc_builder.take().unwrap();
231
232 let source_desc = source_desc_builder
233 .build()
234 .map_err(StreamExecutorError::connector_error)?;
235
236 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
238 else {
239 unreachable!("Partition and offset columns must be set.");
240 };
241 state_store_handler.init_epoch(first_epoch).await?;
243
244 let mut splits_on_fetch: usize = 0;
245 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
246 upstream,
247 stream::pending().boxed(),
248 );
249 if is_pause_on_startup {
250 stream.pause_stream();
251 }
252
253 Self::replace_with_new_batch_reader(
257 &mut splits_on_fetch,
258 &state_store_handler, core.column_ids.clone(),
260 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
261 &source_desc,
262 &mut stream,
263 self.rate_limit_rps,
264 )
265 .await?;
266 let mut reading_file: Option<Arc<str>> = None;
267
268 while let Some(msg) = stream.next().await {
269 match msg {
270 Err(e) => {
271 tracing::error!(
272 source_id = %core.source_id,
273 source_name = %core.source_name,
274 fragment_id = %self.actor_ctx.fragment_id,
275 error = %e.as_report(),
276 "Fetch Error"
277 );
278 GLOBAL_ERROR_METRICS.user_source_error.report([
279 "File source fetch error".to_owned(),
280 core.source_id.to_string(),
281 core.source_name.clone(),
282 self.actor_ctx.fragment_id.to_string(),
283 ]);
284 splits_on_fetch = 0;
285 reading_file = None;
286 }
287 Ok(msg) => {
288 match msg {
289 Either::Left(msg) => {
291 match msg {
292 Message::Barrier(barrier) => {
293 if let Some(mutation) = barrier.mutation.as_deref() {
294 match mutation {
295 Mutation::Pause => stream.pause_stream(),
296 Mutation::Resume => stream.resume_stream(),
297 Mutation::Throttle(fragment_to_apply) => {
298 if let Some(entry) = fragment_to_apply
299 .get(&self.actor_ctx.fragment_id)
300 && entry.throttle_type() == ThrottleType::Source
301 && entry.rate_limit != self.rate_limit_rps
302 {
303 tracing::info!(
304 "updating rate limit from {:?} to {:?}",
305 self.rate_limit_rps,
306 entry.rate_limit
307 );
308 self.rate_limit_rps = entry.rate_limit;
309 splits_on_fetch = 0;
310 reading_file = None;
311 }
312 }
313 _ => (),
314 }
315 }
316
317 let post_commit = state_store_handler
318 .commit_may_update_vnode_bitmap(barrier.epoch)
319 .await?;
320
321 let update_vnode_bitmap =
322 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
323 yield Message::Barrier(barrier);
325
326 if post_commit
327 .post_yield_barrier(update_vnode_bitmap)
328 .await?
329 .is_some()
330 {
331 splits_on_fetch = 0;
335 reading_file = None;
336 }
337
338 if splits_on_fetch == 0 {
339 Self::replace_with_new_batch_reader(
340 &mut splits_on_fetch,
341 &state_store_handler,
342 core.column_ids.clone(),
343 self.build_source_ctx(
344 &source_desc,
345 core.source_id,
346 &core.source_name,
347 ),
348 &source_desc,
349 &mut stream,
350 self.rate_limit_rps,
351 )
352 .await?;
353 }
354 }
355 Message::Chunk(chunk) => {
358 let file_assignment = chunk
360 .data_chunk()
361 .rows()
362 .filter_map(|row| {
363 let filename = row.datum_at(0).unwrap().into_utf8();
364 let size = row.datum_at(2).unwrap().into_int64();
365
366 if size > 0 {
367 Some(OpendalFsSplit::<Src>::new(
368 filename.to_owned(),
369 0,
370 size as usize,
371 ))
372 } else {
373 None
374 }
375 })
376 .collect();
377 state_store_handler.set_states(file_assignment).await?;
378 state_store_handler.try_flush().await?;
379 }
380 Message::Watermark(_) => unreachable!(),
381 }
382 }
383 Either::Right(optional_chunk) => match optional_chunk {
403 Some(chunk) => {
404 let mapping = get_split_offset_mapping_from_chunk(
405 &chunk, split_idx, offset_idx,
406 )
407 .unwrap();
408 debug_assert_eq!(mapping.len(), 1);
409 if let Some((split_id, offset)) = mapping.into_iter().next() {
410 reading_file = Some(split_id.clone());
411 let row = state_store_handler.get(&split_id).await?
412 .unwrap_or_else(|| {
413 panic!("The fs_split (file_name) {:?} should be in the state table.",
414 split_id)
415 });
416 let mut fs_split = match row.datum_at(1) {
417 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
418 OpendalFsSplit::<Src>::restore_from_json(
419 jsonb_ref.to_owned_scalar(),
420 )?
421 }
422 _ => unreachable!(),
423 };
424 fs_split.update_offset(offset)?;
425
426 state_store_handler
427 .set(&split_id, fs_split.encode_to_json())
428 .await?;
429 }
430 let chunk = prune_additional_cols(
431 &chunk,
432 &[split_idx, offset_idx],
433 &source_desc.columns,
434 );
435 yield Message::Chunk(chunk);
436 }
437 None => {
438 tracing::debug!("Deleting file: {:?}", reading_file);
439 if let Some(ref delete_file_name) = reading_file {
440 splits_on_fetch -= 1;
441 state_store_handler.delete(delete_file_name).await?;
442 }
443 }
444 },
445 }
446 }
447 }
448 }
449 }
450}
451
452impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
453 fn execute(self: Box<Self>) -> BoxedMessageStream {
454 self.into_stream().boxed()
455 }
456}
457
458impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
459 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
460 if let Some(core) = &self.stream_source_core {
461 f.debug_struct("FsFetchExecutor")
462 .field("source_id", &core.source_id)
463 .field("column_ids", &core.column_ids)
464 .finish()
465 } else {
466 f.debug_struct("FsFetchExecutor").finish()
467 }
468 }
469}