risingwave_stream/executor/source/
fs_fetch_executor.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, StreamExt};
21use futures_async_stream::try_stream;
22use risingwave_common::catalog::{ColumnId, TableId};
23use risingwave_common::hash::VnodeBitmapExt;
24use risingwave_common::types::ScalarRef;
25use risingwave_connector::source::filesystem::OpendalFsSplit;
26use risingwave_connector::source::filesystem::opendal_source::{
27 OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
28};
29use risingwave_connector::source::reader::desc::SourceDesc;
30use risingwave_connector::source::{
31 BoxStreamingFileSourceChunkStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
32};
33use risingwave_storage::store::PrefetchOptions;
34use thiserror_ext::AsReport;
35
36use super::{
37 SourceStateTableHandler, StreamSourceCore,
38 apply_rate_limit_with_for_streaming_file_source_reader, get_split_offset_col_idx,
39 get_split_offset_mapping_from_chunk, prune_additional_cols,
40};
41use crate::common::rate_limit::limited_chunk_size;
42use crate::executor::prelude::*;
43use crate::executor::stream_reader::StreamReaderWithPause;
44
45const SPLIT_BATCH_SIZE: usize = 1000;
46
47type SplitBatch = Option<Vec<SplitImpl>>;
48
49pub struct FsFetchExecutor<S: StateStore, Src: OpendalSource> {
50 actor_ctx: ActorContextRef,
51
52 stream_source_core: Option<StreamSourceCore<S>>,
54
55 upstream: Option<Executor>,
57
58 rate_limit_rps: Option<u32>,
60
61 _marker: PhantomData<Src>,
62}
63
64impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
65 pub fn new(
66 actor_ctx: ActorContextRef,
67 stream_source_core: StreamSourceCore<S>,
68 upstream: Executor,
69 rate_limit_rps: Option<u32>,
70 ) -> Self {
71 Self {
72 actor_ctx,
73 stream_source_core: Some(stream_source_core),
74 upstream: Some(upstream),
75 rate_limit_rps,
76 _marker: PhantomData,
77 }
78 }
79
80 async fn replace_with_new_batch_reader<const BIASED: bool>(
81 splits_on_fetch: &mut usize,
82 state_store_handler: &SourceStateTableHandler<S>,
83 column_ids: Vec<ColumnId>,
84 source_ctx: SourceContext,
85 source_desc: &SourceDesc,
86 stream: &mut StreamReaderWithPause<BIASED, Option<StreamChunk>>,
87 rate_limit_rps: Option<u32>,
88 ) -> StreamExecutorResult<()> {
89 let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE);
90 let state_table = state_store_handler.state_table();
91 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
92 let table_iter = state_table
93 .iter_with_vnode(
94 vnode,
95 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
96 PrefetchOptions::prefetch_for_small_range_scan(),
98 )
99 .await?;
100 pin_mut!(table_iter);
101 let properties = source_desc.source.config.clone();
102 while let Some(item) = table_iter.next().await {
103 let row = item?;
104 let split = match row.datum_at(1) {
105 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => match properties {
106 risingwave_connector::source::ConnectorProperties::Gcs(_) => {
107 let split: OpendalFsSplit<OpendalGcs> =
108 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
109 SplitImpl::from(split)
110 }
111 risingwave_connector::source::ConnectorProperties::OpendalS3(_) => {
112 let split: OpendalFsSplit<OpendalS3> =
113 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
114 SplitImpl::from(split)
115 }
116 risingwave_connector::source::ConnectorProperties::Azblob(_) => {
117 let split: OpendalFsSplit<OpendalAzblob> =
118 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
119 SplitImpl::from(split)
120 }
121 risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
122 let split: OpendalFsSplit<OpendalPosixFs> =
123 OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
124 SplitImpl::from(split)
125 }
126 _ => unreachable!(),
127 },
128 _ => unreachable!(),
129 };
130 batch.push(split);
131
132 if batch.len() >= SPLIT_BATCH_SIZE {
133 break 'vnodes;
134 }
135 }
136 }
137 if batch.is_empty() {
138 stream.replace_data_stream(stream::pending().boxed());
139 } else {
140 *splits_on_fetch += batch.len();
141
142 let mut merged_stream =
143 stream::empty::<StreamExecutorResult<Option<StreamChunk>>>().boxed();
144 for split in batch {
148 let single_file_stream = Self::build_single_file_stream_reader(
149 column_ids.clone(),
150 source_ctx.clone(),
151 source_desc,
152 Some(vec![split]),
153 rate_limit_rps,
154 )
155 .await?
156 .map_err(StreamExecutorError::connector_error);
157 let single_file_stream = single_file_stream.map(|reader| reader);
158 merged_stream = merged_stream.chain(single_file_stream).boxed();
159 }
160
161 stream.replace_data_stream(merged_stream);
162 }
163
164 Ok(())
165 }
166
167 async fn build_single_file_stream_reader(
173 column_ids: Vec<ColumnId>,
174 source_ctx: SourceContext,
175 source_desc: &SourceDesc,
176 batch: SplitBatch,
177 rate_limit_rps: Option<u32>,
178 ) -> StreamExecutorResult<BoxStreamingFileSourceChunkStream> {
179 let (stream, _) = source_desc
180 .source
181 .build_stream(batch, column_ids, Arc::new(source_ctx), false)
182 .await
183 .map_err(StreamExecutorError::connector_error)?;
184 let optional_stream: BoxStreamingFileSourceChunkStream = stream
185 .map(|item| item.map(Some))
186 .chain(stream::once(async { Ok(None) }))
187 .boxed();
188 Ok(
189 apply_rate_limit_with_for_streaming_file_source_reader(optional_stream, rate_limit_rps)
190 .boxed(),
191 )
192 }
193
194 fn build_source_ctx(
195 &self,
196 source_desc: &SourceDesc,
197 source_id: TableId,
198 source_name: &str,
199 ) -> SourceContext {
200 SourceContext::new(
201 self.actor_ctx.id,
202 source_id,
203 self.actor_ctx.fragment_id,
204 source_name.to_owned(),
205 source_desc.metrics.clone(),
206 SourceCtrlOpts {
207 chunk_size: limited_chunk_size(self.rate_limit_rps),
208 split_txn: self.rate_limit_rps.is_some(), },
210 source_desc.source.config.clone(),
211 None,
212 )
213 }
214
215 #[try_stream(ok = Message, error = StreamExecutorError)]
216 async fn into_stream(mut self) {
217 let mut upstream = self.upstream.take().unwrap().execute();
218 let barrier = expect_first_barrier(&mut upstream).await?;
219 let first_epoch = barrier.epoch;
220 let is_pause_on_startup = barrier.is_pause_on_startup();
221 yield Message::Barrier(barrier);
222
223 let mut core = self.stream_source_core.take().unwrap();
224 let mut state_store_handler = core.split_state_store;
225
226 let source_desc_builder = core.source_desc_builder.take().unwrap();
228
229 let source_desc = source_desc_builder
230 .build()
231 .map_err(StreamExecutorError::connector_error)?;
232
233 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
234 else {
235 unreachable!("Partition and offset columns must be set.");
236 };
237 state_store_handler.init_epoch(first_epoch).await?;
239
240 let mut splits_on_fetch: usize = 0;
241 let mut stream = StreamReaderWithPause::<true, Option<StreamChunk>>::new(
242 upstream,
243 stream::pending().boxed(),
244 );
245 if is_pause_on_startup {
246 stream.pause_stream();
247 }
248
249 Self::replace_with_new_batch_reader(
253 &mut splits_on_fetch,
254 &state_store_handler, core.column_ids.clone(),
256 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
257 &source_desc,
258 &mut stream,
259 self.rate_limit_rps,
260 )
261 .await?;
262 let mut reading_file: Arc<str> = "".into();
263
264 while let Some(msg) = stream.next().await {
265 match msg {
266 Err(e) => {
267 tracing::error!(error = %e.as_report(), "Fetch Error");
268 splits_on_fetch = 0;
269 }
270 Ok(msg) => {
271 match msg {
272 Either::Left(msg) => {
274 match msg {
275 Message::Barrier(barrier) => {
276 if let Some(mutation) = barrier.mutation.as_deref() {
277 match mutation {
278 Mutation::Pause => stream.pause_stream(),
279 Mutation::Resume => stream.resume_stream(),
280 Mutation::Throttle(actor_to_apply) => {
281 if let Some(new_rate_limit) =
282 actor_to_apply.get(&self.actor_ctx.id)
283 && *new_rate_limit != self.rate_limit_rps
284 {
285 tracing::info!(
286 "updating rate limit from {:?} to {:?}",
287 self.rate_limit_rps,
288 *new_rate_limit
289 );
290 self.rate_limit_rps = *new_rate_limit;
291 splits_on_fetch = 0;
292 }
293 }
294 _ => (),
295 }
296 }
297
298 let post_commit = state_store_handler
299 .commit_may_update_vnode_bitmap(barrier.epoch)
300 .await?;
301
302 let update_vnode_bitmap =
303 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
304 yield Message::Barrier(barrier);
306
307 if let Some((_, cache_may_stale)) =
308 post_commit.post_yield_barrier(update_vnode_bitmap).await?
309 {
310 if cache_may_stale {
312 splits_on_fetch = 0;
313 }
314 }
315
316 if splits_on_fetch == 0 {
317 Self::replace_with_new_batch_reader(
318 &mut splits_on_fetch,
319 &state_store_handler,
320 core.column_ids.clone(),
321 self.build_source_ctx(
322 &source_desc,
323 core.source_id,
324 &core.source_name,
325 ),
326 &source_desc,
327 &mut stream,
328 self.rate_limit_rps,
329 )
330 .await?;
331 }
332 }
333 Message::Chunk(chunk) => {
336 let file_assignment = chunk
338 .data_chunk()
339 .rows()
340 .map(|row| {
341 let filename = row.datum_at(0).unwrap().into_utf8();
342
343 let size = row.datum_at(2).unwrap().into_int64();
344 OpendalFsSplit::<Src>::new(
345 filename.to_owned(),
346 0,
347 size as usize,
348 )
349 })
350 .collect();
351 state_store_handler.set_states(file_assignment).await?;
352 state_store_handler.try_flush().await?;
353 }
354 Message::Watermark(_) => unreachable!(),
355 }
356 }
357 Either::Right(optional_chunk) => match optional_chunk {
377 Some(chunk) => {
378 let mapping = get_split_offset_mapping_from_chunk(
379 &chunk, split_idx, offset_idx,
380 )
381 .unwrap();
382 debug_assert_eq!(mapping.len(), 1);
383 if let Some((split_id, _offset)) = mapping.into_iter().next() {
384 reading_file = split_id.clone();
385 let row = state_store_handler.get(&split_id).await?
386 .unwrap_or_else(|| {
387 panic!("The fs_split (file_name) {:?} should be in the state table.",
388 split_id)
389 });
390 let fs_split = match row.datum_at(1) {
391 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
392 OpendalFsSplit::<Src>::restore_from_json(
393 jsonb_ref.to_owned_scalar(),
394 )?
395 }
396 _ => unreachable!(),
397 };
398
399 state_store_handler
400 .set(&split_id, fs_split.encode_to_json())
401 .await?;
402 }
403 let chunk = prune_additional_cols(
404 &chunk,
405 split_idx,
406 offset_idx,
407 &source_desc.columns,
408 );
409 yield Message::Chunk(chunk);
410 }
411 None => {
412 splits_on_fetch -= 1;
413 state_store_handler.delete(&reading_file).await?;
414 }
415 },
416 }
417 }
418 }
419 }
420 }
421}
422
423impl<S: StateStore, Src: OpendalSource> Execute for FsFetchExecutor<S, Src> {
424 fn execute(self: Box<Self>) -> BoxedMessageStream {
425 self.into_stream().boxed()
426 }
427}
428
429impl<S: StateStore, Src: OpendalSource> Debug for FsFetchExecutor<S, Src> {
430 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
431 if let Some(core) = &self.stream_source_core {
432 f.debug_struct("FsFetchExecutor")
433 .field("source_id", &core.source_id)
434 .field("column_ids", &core.column_ids)
435 .finish()
436 } else {
437 f.debug_struct("FsFetchExecutor").finish()
438 }
439 }
440}