risingwave_stream/executor/source/
fs_fetch_executor.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
25use risingwave_common::types::ScalarRef;
26use risingwave_connector::source::filesystem::OpendalFsSplit;
27use risingwave_connector::source::filesystem::opendal_source::{
28 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
29};
30use risingwave_connector::source::reader::desc::SourceDesc;
31use risingwave_connector::source::{
32 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
33};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{
38 SourceStateTableHandler, StreamSourceCore,
39 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
40 get_split_offset_mapping_from_chunk, prune_additional_cols,
41};
42use crate::common::rate_limit::limited_chunk_size;
43use crate::executor::prelude::*;
44use crate::executor::stream_reader::StreamReaderWithPause;
45
46const SPLIT_BATCH_SIZE: usize = 1000;
47
48type SplitBatch = Option<Vec<SplitImpl>>;
49
50pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
51 actor_ctx: ActorContextRef,
52
53 stream_source_core: Option<StreamSourceCore<S>>,
55
56 upstream: Option<Executor>,
58
59 rate_limit_rps: Option<u32>,
61
62 _marker: PhantomData<Src>,
63}
64
65impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
66 pub fn new(
67 actor_ctx: ActorContextRef,
68 stream_source_core: StreamSourceCore<S>,
69 upstream: Executor,
70 rate_limit_rps: Option<u32>,
71 ) -> Self {
72 Self {
73 actor_ctx,
74 stream_source_core: Some(stream_source_core),
75 upstream: Some(upstream),
76 rate_limit_rps,
77 _marker: PhantomData,
78 }
79 }
80
81 async fn replace_with_new_batch_reader<const BIASED: bool>(
82 splits_on_fetch: &mut usize,
83 state_store_handler: &SourceStateTableHandler<S>,
84 column_ids: Vec<ColumnId>,
85 source_ctx: SourceContext,
86 source_desc: &SourceDesc,
87 stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
88 rate_limit_rps: Option<u32>,
89 ) -> StreamExecutorResult<()> {
90 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
91 let state_table = state_store_handler.state_table();
92 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
93 let table_iter = state_table
94 .iter_with_vnode(
95 vnode,
96 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
97 PrefetchOptions::prefetch_for_small_range_scan(),
99 )
100 .await?;
101 pin_mut!(table_iter);
102 let properties = source_desc.source.config.clone();
103 while let Some(item) = table_iter.next().await {
104 let row = item?;
105 let split = match row.datum_at(1) {
106 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
107 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
108 let split: OpendalFsSplit<OpendalGcs> =
109 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
110 SplitImpl::from(split)
111 }
112 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
113 let split: OpendalFsSplit<OpendalS3> =
114 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
115 SplitImpl::from(split)
116 }
117 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
118 let split: OpendalFsSplit<OpendalAzblob> =
119 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
120 SplitImpl::from(split)
121 }
122 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
123 let split: OpendalFsSplit<OpendalPosixFs> =
124 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
125 SplitImpl::from(split)
126 }
127 _ => unreachable!(),
128 },
129 _ => unreachable!(),
130 };
131 batch.push(split);
132
133 if batch.len() >= SPLIT_BATCH_SIZE {
134 break 'vnodes;
135 }
136 }
137 }
138 if batch.is_empty() {
139 stream.replace_data_stream(stream::pending().boxed());
140 } else {
141 *splits_on_fetch += batch.len();
142
143 let mut merged_stream =
144 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
145 for split in batch {
149 let single_file_stream = Self::build_single_file_stream_reader(
150 column_ids.clone(),
151 source_ctx.clone(),
152 source_desc,
153 Some(vec![split]),
154 rate_limit_rps,
155 )
156 .await?
157 .map_err(StreamExecutorError::connector_error);
158 let single_file_stream = single_file_stream.map(|reader| reader);
159 merged_stream = merged_stream.chain(single_file_stream).boxed();
160 }
161
162 stream.replace_data_stream(merged_stream);
163 }
164
165 Ok(())
166 }
167
168 async fn build_single_file_stream_reader(
174 column_ids: Vec<ColumnId>,
175 source_ctx: SourceContext,
176 source_desc: &SourceDesc,
177 batch: SplitBatch,
178 rate_limit_rps: Option<u32>,
179 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
180 let (stream, _) = source_desc
181 .source
182 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
183 .await
184 .map_err(StreamExecutorError::connector_error)?;
185 let optional_stream: BoxStreamingFileSourceChunkStream = stream
186 .map(|item| item.map(Some))
187 .chain(stream::once(async { Ok(None) }))
188 .boxed();
189 Ok(
190 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
191 .boxed(),
192 )
193 }
194
195 fn build_source_ctx(
196 &self,
197 source_desc: &SourceDesc,
198 source_id: TableId,
199 source_name: &str,
200 ) -> SourceContext {
201 SourceContext::new(
202 self.actor_ctx.id,
203 source_id,
204 self.actor_ctx.fragment_id,
205 source_name.to_owned(),
206 source_desc.metrics.clone(),
207 SourceCtrlOpts {
208 chunk_size: limited_chunk_size(self.rate_limit_rps),
209 split_txn: self.rate_limit_rps.is_some(), },
211 source_desc.source.config.clone(),
212 None,
213 )
214 }
215
216 #[try_stream(ok = Message, error = StreamExecutorError)]
217 async fn into_stream(mut self) {
218 let mut upstream = self.upstream.take().unwrap().execute();
219 let barrier = expect_first_barrier(&mut upstream).await?;
220 let first_epoch = barrier.epoch;
221 let is_pause_on_startup = barrier.is_pause_on_startup();
222 yield Message::Barrier(barrier);
223
224 let mut core = self.stream_source_core.take().unwrap();
225 let mut state_store_handler = core.split_state_store;
226
227 let source_desc_builder = core.source_desc_builder.take().unwrap();
229
230 let source_desc = source_desc_builder
231 .build()
232 .map_err(StreamExecutorError::connector_error)?;
233
234 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
235 else {
236 unreachable!("Partition and offset columns must be set.");
237 };
238 state_store_handler.init_epoch(first_epoch).await?;
240
241 let mut splits_on_fetch: usize = 0;
242 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
243 upstream,
244 stream::pending().boxed(),
245 );
246 if is_pause_on_startup {
247 stream.pause_stream();
248 }
249
250 Self::replace_with_new_batch_reader(
254 &mut splits_on_fetch,
255 &state_store_handler, core.column_ids.clone(),
257 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
258 &source_desc,
259 &mut stream,
260 self.rate_limit_rps,
261 )
262 .await?;
263 let mut reading_file: Option<Arc<str>> = None;
264
265 while let Some(msg) = stream.next().await {
266 match msg {
267 Err(e) => {
268 tracing::error!(
269 source_id = %core.source_id,
270 source_name = %core.source_name,
271 fragment_id = %self.actor_ctx.fragment_id,
272 error = %e.as_report(),
273 "Fetch Error"
274 );
275 GLOBAL_ERROR_METRICS.user_source_error.report([
276 "File source fetch error".to_owned(),
277 core.source_id.to_string(),
278 core.source_name.to_owned(),
279 self.actor_ctx.fragment_id.to_string(),
280 ]);
281 splits_on_fetch = 0;
282 }
283 Ok(msg) => {
284 match msg {
285 Either::Left(msg) => {
287 match msg {
288 Message::Barrier(barrier) => {
289 if let Some(mutation) = barrier.mutation.as_deref() {
290 match mutation {
291 Mutation::Pause => stream.pause_stream(),
292 Mutation::Resume => stream.resume_stream(),
293 Mutation::Throttle(actor_to_apply) => {
294 if let Some(new_rate_limit) =
295 actor_to_apply.get(&self.actor_ctx.id)
296 && *new_rate_limit != self.rate_limit_rps
297 {
298 tracing::info!(
299 "updating rate limit from {:?} to {:?}",
300 self.rate_limit_rps,
301 *new_rate_limit
302 );
303 self.rate_limit_rps = *new_rate_limit;
304 splits_on_fetch = 0;
305 }
306 }
307 _ => (),
308 }
309 }
310
311 let post_commit = state_store_handler
312 .commit_may_update_vnode_bitmap(barrier.epoch)
313 .await?;
314
315 let update_vnode_bitmap =
316 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
317 yield Message::Barrier(barrier);
319
320 if let Some((_, cache_may_stale)) =
321 post_commit.post_yield_barrier(update_vnode_bitmap).await?
322 {
323 if cache_may_stale {
325 splits_on_fetch = 0;
326 }
327 }
328
329 if splits_on_fetch == 0 {
330 Self::replace_with_new_batch_reader(
331 &mut splits_on_fetch,
332 &state_store_handler,
333 core.column_ids.clone(),
334 self.build_source_ctx(
335 &source_desc,
336 core.source_id,
337 &core.source_name,
338 ),
339 &source_desc,
340 &mut stream,
341 self.rate_limit_rps,
342 )
343 .await?;
344 }
345 }
346 Message::Chunk(chunk) => {
349 let file_assignment = chunk
351 .data_chunk()
352 .rows()
353 .filter_map(|row| {
354 let filename = row.datum_at(0).unwrap().into_utf8();
355 let size = row.datum_at(2).unwrap().into_int64();
356
357 if size > 0 {
358 Some(OpendalFsSplit::<Src>::new(
359 filename.to_owned(),
360 0,
361 size as usize,
362 ))
363 } else {
364 None
365 }
366 })
367 .collect();
368 state_store_handler.set_states(file_assignment).await?;
369 state_store_handler.try_flush().await?;
370 }
371 Message::Watermark(_) => unreachable!(),
372 }
373 }
374 Either::Right(optional_chunk) => match optional_chunk {
394 Some(chunk) => {
395 let mapping = get_split_offset_mapping_from_chunk(
396 &chunk, split_idx, offset_idx,
397 )
398 .unwrap();
399 debug_assert_eq!(mapping.len(), 1);
400 if let Some((split_id, offset)) = mapping.into_iter().next() {
401 reading_file = Some(split_id.clone());
402 let row = state_store_handler.get(&split_id).await?
403 .unwrap_or_else(|| {
404 panic!("The fs_split (file_name) {:?} should be in the state table.",
405 split_id)
406 });
407 let mut fs_split = match row.datum_at(1) {
408 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
409 OpendalFsSplit::<Src>::restore_from_json(
410 jsonb_ref.to_owned_scalar(),
411 )?
412 }
413 _ => unreachable!(),
414 };
415 fs_split.update_offset(offset)?;
416
417 state_store_handler
418 .set(&split_id, fs_split.encode_to_json())
419 .await?;
420 }
421 let chunk = prune_additional_cols(
422 &chunk,
423 split_idx,
424 offset_idx,
425 &source_desc.columns,
426 );
427 yield Message::Chunk(chunk);
428 }
429 None => {
430 tracing::debug!("Deleting file: {:?}", reading_file);
431 if let Some(ref delete_file_name) = reading_file {
432 splits_on_fetch -= 1;
433 state_store_handler.delete(delete_file_name).await?;
434 }
435 }
436 },
437 }
438 }
439 }
440 }
441 }
442}
443
444impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
445 fn execute(self: Box<Self>) -> BoxedMessageStream {
446 self.into_stream().boxed()
447 }
448}
449
450impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
451 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
452 if let Some(core) = &self.stream_source_core {
453 f.debug_struct("FsFetchExecutor")
454 .field("source_id", &core.source_id)
455 .field("column_ids", &core.column_ids)
456 .finish()
457 } else {
458 f.debug_struct("FsFetchExecutor").finish()
459 }
460 }
461}