1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26 TableId,
27};
28use risingwave_common::config::StreamingConfig;
29use risingwave_common::hash::VnodeBitmapExt;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42pub struct IcebergFetchExecutor<S: StateStore> {
48 actor_ctx: ActorContextRef,
49
50 stream_source_core: Option<StreamSourceCore<S>>,
52
53 upstream: Option<Executor>,
56
57 rate_limit_rps: Option<u32>,
59
60 streaming_config: Arc<StreamingConfig>,
62}
63
64struct ChunksWithState {
69 chunks: Vec<StreamChunk>,
71
72 data_file_path: String,
74
75 #[expect(dead_code)]
77 last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82 use anyhow::Context;
83 use iceberg::expr::BoundPredicate;
84 use iceberg::scan::FileScanTask;
85 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
86 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
87 use serde::{Deserialize, Serialize};
88
89 use crate::executor::StreamExecutorResult;
90 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98 pub struct PersistedFileScanTask {
99 pub start: u64,
101 pub length: u64,
103 pub record_count: Option<u64>,
108
109 pub data_file_path: String,
111
112 pub data_file_content: DataContentType,
114
115 pub data_file_format: DataFileFormat,
117
118 pub schema: SchemaRef,
120 pub project_field_ids: Vec<i32>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub predicate: Option<BoundPredicate>,
125
126 pub deletes: Vec<PersistedFileScanTask>,
128 pub sequence_number: i64,
130 pub equality_ids: Vec<i32>,
132
133 pub file_size_in_bytes: u64,
134 }
135
136 impl PersistedFileScanTask {
137 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
139 let persisted_task: Self =
140 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
141 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
142 Ok(Self::to_task(persisted_task))
143 }
144
145 pub fn encode(task: FileScanTask) -> JsonbVal {
147 let persisted_task = Self::from_task(task);
148 serde_json::to_value(persisted_task).unwrap().into()
149 }
150
151 fn to_task(
153 Self {
154 start,
155 length,
156 record_count,
157 data_file_path,
158 data_file_content,
159 data_file_format,
160 schema,
161 project_field_ids,
162 predicate,
163 deletes,
164 sequence_number,
165 equality_ids,
166 file_size_in_bytes,
167 }: Self,
168 ) -> FileScanTask {
169 FileScanTask {
170 start,
171 length,
172 record_count,
173 data_file_path,
174 data_file_content,
175 data_file_format,
176 schema,
177 project_field_ids,
178 predicate,
179 deletes: deletes
180 .into_iter()
181 .map(PersistedFileScanTask::to_task)
182 .collect(),
183 sequence_number,
184 equality_ids,
185 file_size_in_bytes,
186 }
187 }
188
189 fn from_task(
191 FileScanTask {
192 start,
193 length,
194 record_count,
195 data_file_path,
196 data_file_content,
197 data_file_format,
198 schema,
199 project_field_ids,
200 predicate,
201 deletes,
202 sequence_number,
203 equality_ids,
204 file_size_in_bytes,
205 }: FileScanTask,
206 ) -> Self {
207 Self {
208 start,
209 length,
210 record_count,
211 data_file_path,
212 data_file_content,
213 data_file_format,
214 schema,
215 project_field_ids,
216 predicate,
217 deletes: deletes
218 .into_iter()
219 .map(PersistedFileScanTask::from_task)
220 .collect(),
221 sequence_number,
222 equality_ids,
223 file_size_in_bytes,
224 }
225 }
226 }
227}
228
229impl<S: StateStore> IcebergFetchExecutor<S> {
230 pub fn new(
231 actor_ctx: ActorContextRef,
232 stream_source_core: StreamSourceCore<S>,
233 upstream: Executor,
234 rate_limit_rps: Option<u32>,
235 streaming_config: Arc<StreamingConfig>,
236 ) -> Self {
237 Self {
238 actor_ctx,
239 stream_source_core: Some(stream_source_core),
240 upstream: Some(upstream),
241 rate_limit_rps,
242 streaming_config,
243 }
244 }
245
246 #[expect(clippy::too_many_arguments)]
247 async fn replace_with_new_batch_reader<const BIASED: bool>(
248 splits_on_fetch: &mut usize,
249 state_store_handler: &SourceStateTableHandler<S>,
250 column_ids: Vec<ColumnId>,
251 source_ctx: SourceContext,
252 source_desc: SourceDesc,
253 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
254 rate_limit_rps: Option<u32>,
255 streaming_config: Arc<StreamingConfig>,
256 ) -> StreamExecutorResult<()> {
257 let mut batch =
258 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
259 let state_table = state_store_handler.state_table();
260 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
261 let table_iter = state_table
262 .iter_with_vnode(
263 vnode,
264 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
265 PrefetchOptions::prefetch_for_small_range_scan(),
267 )
268 .await?;
269 pin_mut!(table_iter);
270 while let Some(item) = table_iter.next().await {
271 let row = item?;
272 let task = match row.datum_at(1) {
273 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
274 PersistedFileScanTask::decode(jsonb_ref)?
275 }
276 _ => unreachable!(),
277 };
278 batch.push(task);
279
280 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
281 break 'vnodes;
282 }
283 }
284 }
285 if batch.is_empty() {
286 stream.replace_data_stream(stream::pending().boxed());
287 } else {
288 *splits_on_fetch += batch.len();
289 let batch_reader = Self::build_batched_stream_reader(
290 column_ids,
291 source_ctx,
292 source_desc,
293 batch,
294 rate_limit_rps,
295 streaming_config,
296 )
297 .map_err(StreamExecutorError::connector_error);
298 stream.replace_data_stream(batch_reader);
299 }
300
301 Ok(())
302 }
303
304 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
305 async fn build_batched_stream_reader(
306 _column_ids: Vec<ColumnId>,
307 _source_ctx: SourceContext,
308 source_desc: SourceDesc,
309 batch: Vec<FileScanTask>,
310 _rate_limit_rps: Option<u32>,
311 streaming_config: Arc<StreamingConfig>,
312 ) {
313 let file_path_idx = source_desc
314 .columns
315 .iter()
316 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
317 .unwrap();
318 let file_pos_idx = source_desc
319 .columns
320 .iter()
321 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
322 .unwrap();
323 let properties = source_desc.source.config.clone();
324 let properties = match properties {
325 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
326 iceberg_properties
327 }
328 _ => unreachable!(),
329 };
330 let table = properties.load_table().await?;
331
332 for task in batch {
333 let mut chunks = vec![];
334 #[for_await]
335 for chunk in scan_task_to_chunk(
336 table.clone(),
337 task,
338 IcebergScanOpts {
339 chunk_size: streaming_config.developer.chunk_size,
340 need_seq_num: true, need_file_path_and_pos: true,
342 },
343 None,
344 ) {
345 let chunk = chunk?;
346 chunks.push(StreamChunk::from_parts(
347 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
348 chunk,
349 ));
350 }
351 let last_chunk = chunks.last().unwrap();
353 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
354 let data_file_path = last_row
355 .datum_at(file_path_idx)
356 .unwrap()
357 .into_utf8()
358 .to_owned();
359 let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
360 yield ChunksWithState {
361 chunks,
362 data_file_path,
363 last_read_pos,
364 };
365 }
366 }
367
368 fn build_source_ctx(
369 &self,
370 source_desc: &SourceDesc,
371 source_id: TableId,
372 source_name: &str,
373 ) -> SourceContext {
374 SourceContext::new(
375 self.actor_ctx.id,
376 source_id,
377 self.actor_ctx.fragment_id,
378 source_name.to_owned(),
379 source_desc.metrics.clone(),
380 SourceCtrlOpts {
381 chunk_size: limited_chunk_size(self.rate_limit_rps),
382 split_txn: self.rate_limit_rps.is_some(), },
384 source_desc.source.config.clone(),
385 None,
386 )
387 }
388
389 #[try_stream(ok = Message, error = StreamExecutorError)]
390 async fn into_stream(mut self) {
391 let mut upstream = self.upstream.take().unwrap().execute();
392 let barrier = expect_first_barrier(&mut upstream).await?;
393 let first_epoch = barrier.epoch;
394 let is_pause_on_startup = barrier.is_pause_on_startup();
395 yield Message::Barrier(barrier);
396
397 let mut core = self.stream_source_core.take().unwrap();
398 let mut state_store_handler = core.split_state_store;
399
400 let source_desc_builder = core.source_desc_builder.take().unwrap();
402
403 let source_desc = source_desc_builder
404 .build()
405 .map_err(StreamExecutorError::connector_error)?;
406
407 let file_path_idx = source_desc
408 .columns
409 .iter()
410 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
411 .unwrap();
412 let file_pos_idx = source_desc
413 .columns
414 .iter()
415 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
416 .unwrap();
417 let row_id_idx = source_desc
419 .columns
420 .iter()
421 .position(|c| c.name == ROW_ID_COLUMN_NAME)
422 .unwrap();
423 tracing::trace!(
424 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
425 source_desc.columns,
426 file_path_idx,
427 file_pos_idx,
428 row_id_idx
429 );
430 state_store_handler.init_epoch(first_epoch).await?;
432
433 let mut splits_on_fetch: usize = 0;
434 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
435 upstream,
436 stream::pending().boxed(),
437 );
438
439 if is_pause_on_startup {
440 stream.pause_stream();
441 }
442
443 Self::replace_with_new_batch_reader(
447 &mut splits_on_fetch,
448 &state_store_handler, core.column_ids.clone(),
450 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
451 source_desc.clone(),
452 &mut stream,
453 self.rate_limit_rps,
454 self.streaming_config.clone(),
455 )
456 .await?;
457
458 while let Some(msg) = stream.next().await {
459 match msg {
460 Err(e) => {
461 tracing::error!(error = %e.as_report(), "Fetch Error");
462 splits_on_fetch = 0;
463 }
464 Ok(msg) => {
465 match msg {
466 Either::Left(msg) => {
468 match msg {
469 Message::Barrier(barrier) => {
470 let mut need_rebuild_reader = false;
471
472 if let Some(mutation) = barrier.mutation.as_deref() {
473 match mutation {
474 Mutation::Pause => stream.pause_stream(),
475 Mutation::Resume => stream.resume_stream(),
476 Mutation::Throttle(actor_to_apply) => {
477 if let Some(new_rate_limit) =
478 actor_to_apply.get(&self.actor_ctx.id)
479 && *new_rate_limit != self.rate_limit_rps
480 {
481 tracing::debug!(
482 "updating rate limit from {:?} to {:?}",
483 self.rate_limit_rps,
484 *new_rate_limit
485 );
486 self.rate_limit_rps = *new_rate_limit;
487 need_rebuild_reader = true;
488 }
489 }
490 _ => (),
491 }
492 }
493
494 let post_commit = state_store_handler
495 .commit_may_update_vnode_bitmap(barrier.epoch)
496 .await?;
497
498 let update_vnode_bitmap =
499 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
500 yield Message::Barrier(barrier);
502
503 if let Some((_, cache_may_stale)) =
504 post_commit.post_yield_barrier(update_vnode_bitmap).await?
505 {
506 if cache_may_stale {
508 splits_on_fetch = 0;
509 }
510 }
511
512 if splits_on_fetch == 0 || need_rebuild_reader {
513 Self::replace_with_new_batch_reader(
514 &mut splits_on_fetch,
515 &state_store_handler,
516 core.column_ids.clone(),
517 self.build_source_ctx(
518 &source_desc,
519 core.source_id,
520 &core.source_name,
521 ),
522 source_desc.clone(),
523 &mut stream,
524 self.rate_limit_rps,
525 self.streaming_config.clone(),
526 )
527 .await?;
528 }
529 }
530 Message::Chunk(chunk) => {
533 let jsonb_values: Vec<(String, JsonbVal)> = chunk
534 .data_chunk()
535 .rows()
536 .map(|row| {
537 let file_name = row.datum_at(0).unwrap().into_utf8();
538 let split = row.datum_at(1).unwrap().into_jsonb();
539 (file_name.to_owned(), split.to_owned_scalar())
540 })
541 .collect();
542 state_store_handler.set_states_json(jsonb_values).await?;
543 state_store_handler.try_flush().await?;
544 }
545 Message::Watermark(_) => unreachable!(),
546 }
547 }
548 Either::Right(ChunksWithState {
550 chunks,
551 data_file_path,
552 last_read_pos: _,
553 }) => {
554 if true {
556 splits_on_fetch -= 1;
557 state_store_handler.delete(&data_file_path).await?;
558 }
559
560 for chunk in &chunks {
561 let chunk = prune_additional_cols(
562 chunk,
563 file_path_idx,
564 file_pos_idx,
565 &source_desc.columns,
566 );
567 let (chunk, op) = chunk.into_parts();
569 let (mut columns, visibility) = chunk.into_parts();
570 columns.insert(
571 row_id_idx,
572 Arc::new(
573 SerialArray::from_iter_bitmap(
574 itertools::repeat_n(Serial::from(0), columns[0].len()),
575 Bitmap::ones(columns[0].len()),
576 )
577 .into(),
578 ),
579 );
580 let chunk = StreamChunk::from_parts(
581 op,
582 DataChunk::from_parts(columns.into(), visibility),
583 );
584
585 yield Message::Chunk(chunk);
586 }
587 }
588 }
589 }
590 }
591 }
592 }
593}
594
595impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
596 fn execute(self: Box<Self>) -> BoxedMessageStream {
597 self.into_stream().boxed()
598 }
599}
600
601impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
602 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
603 if let Some(core) = &self.stream_source_core {
604 f.debug_struct("IcebergFetchExecutor")
605 .field("source_id", &core.source_id)
606 .field("column_ids", &core.column_ids)
607 .finish()
608 } else {
609 f.debug_struct("IcebergFetchExecutor").finish()
610 }
611 }
612}