1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42pub struct IcebergFetchExecutor<S: StateStore> {
48 actor_ctx: ActorContextRef,
49
50 stream_source_core: Option<StreamSourceCore<S>>,
52
53 upstream: Option<Executor>,
56
57 rate_limit_rps: Option<u32>,
59
60 streaming_config: Arc<StreamingConfig>,
62}
63
64pub(crate) struct ChunksWithState {
69 pub chunks: Vec<StreamChunk>,
71
72 pub data_file_path: String,
74
75 #[expect(dead_code)]
77 pub last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82 use std::sync::Arc;
83
84 use anyhow::Context;
85 use iceberg::expr::BoundPredicate;
86 use iceberg::scan::FileScanTask;
87 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
88 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
89 use serde::{Deserialize, Serialize};
90
91 use crate::executor::StreamExecutorResult;
92 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100 pub struct PersistedFileScanTask {
101 pub start: u64,
103 pub length: u64,
105 pub record_count: Option<u64>,
110
111 pub data_file_path: String,
113
114 pub data_file_content: DataContentType,
116
117 pub data_file_format: DataFileFormat,
119
120 pub schema: SchemaRef,
122 pub project_field_ids: Vec<i32>,
124 #[serde(skip_serializing_if = "Option::is_none")]
126 pub predicate: Option<BoundPredicate>,
127
128 pub deletes: Vec<PersistedFileScanTask>,
130 pub sequence_number: i64,
132 pub equality_ids: Vec<i32>,
134
135 pub file_size_in_bytes: u64,
136 }
137
138 impl PersistedFileScanTask {
139 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
141 let persisted_task: Self =
142 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
143 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
144 Ok(Self::to_task(persisted_task))
145 }
146
147 pub fn encode(task: FileScanTask) -> JsonbVal {
149 let persisted_task = Self::from_task(task);
150 serde_json::to_value(persisted_task).unwrap().into()
151 }
152
153 fn to_task(
155 Self {
156 start,
157 length,
158 record_count,
159 data_file_path,
160 data_file_content,
161 data_file_format,
162 schema,
163 project_field_ids,
164 predicate,
165 deletes,
166 sequence_number,
167 equality_ids,
168 file_size_in_bytes,
169 }: Self,
170 ) -> FileScanTask {
171 FileScanTask {
172 start,
173 length,
174 record_count,
175 data_file_path,
176 data_file_content,
177 data_file_format,
178 schema,
179 project_field_ids,
180 predicate,
181 deletes: deletes
182 .into_iter()
183 .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
184 .collect(),
185 sequence_number,
186 equality_ids,
187 file_size_in_bytes,
188 }
189 }
190
191 fn from_task(
193 FileScanTask {
194 start,
195 length,
196 record_count,
197 data_file_path,
198 data_file_content,
199 data_file_format,
200 schema,
201 project_field_ids,
202 predicate,
203 deletes,
204 sequence_number,
205 equality_ids,
206 file_size_in_bytes,
207 }: FileScanTask,
208 ) -> Self {
209 Self {
210 start,
211 length,
212 record_count,
213 data_file_path,
214 data_file_content,
215 data_file_format,
216 schema,
217 project_field_ids,
218 predicate,
219 deletes: deletes
220 .into_iter()
221 .map(PersistedFileScanTask::from_task_ref)
222 .collect(),
223 sequence_number,
224 equality_ids,
225 file_size_in_bytes,
226 }
227 }
228
229 fn from_task_ref(task: Arc<FileScanTask>) -> Self {
230 Self {
231 start: task.start,
232 length: task.length,
233 record_count: task.record_count,
234 data_file_path: task.data_file_path.clone(),
235 data_file_content: task.data_file_content,
236 data_file_format: task.data_file_format,
237 schema: task.schema.clone(),
238 project_field_ids: task.project_field_ids.clone(),
239 predicate: task.predicate.clone(),
240 deletes: task
241 .deletes
242 .iter()
243 .cloned()
244 .map(PersistedFileScanTask::from_task_ref)
245 .collect(),
246 sequence_number: task.sequence_number,
247 equality_ids: task.equality_ids.clone(),
248 file_size_in_bytes: task.file_size_in_bytes,
249 }
250 }
251 }
252}
253
254impl<S: StateStore> IcebergFetchExecutor<S> {
255 pub fn new(
256 actor_ctx: ActorContextRef,
257 stream_source_core: StreamSourceCore<S>,
258 upstream: Executor,
259 rate_limit_rps: Option<u32>,
260 streaming_config: Arc<StreamingConfig>,
261 ) -> Self {
262 Self {
263 actor_ctx,
264 stream_source_core: Some(stream_source_core),
265 upstream: Some(upstream),
266 rate_limit_rps,
267 streaming_config,
268 }
269 }
270
271 #[expect(clippy::too_many_arguments)]
272 async fn replace_with_new_batch_reader<const BIASED: bool>(
273 splits_on_fetch: &mut usize,
274 state_store_handler: &SourceStateTableHandler<S>,
275 column_ids: Vec<ColumnId>,
276 source_ctx: SourceContext,
277 source_desc: SourceDesc,
278 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
279 rate_limit_rps: Option<u32>,
280 streaming_config: Arc<StreamingConfig>,
281 ) -> StreamExecutorResult<()> {
282 let mut batch =
283 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
284 let state_table = state_store_handler.state_table();
285 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
286 let table_iter = state_table
287 .iter_with_vnode(
288 vnode,
289 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
290 PrefetchOptions::prefetch_for_small_range_scan(),
292 )
293 .await?;
294 pin_mut!(table_iter);
295 while let Some(item) = table_iter.next().await {
296 let row = item?;
297 let task = match row.datum_at(1) {
298 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
299 PersistedFileScanTask::decode(jsonb_ref)?
300 }
301 _ => unreachable!(),
302 };
303 batch.push(task);
304
305 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
306 break 'vnodes;
307 }
308 }
309 }
310 if batch.is_empty() {
311 stream.replace_data_stream(stream::pending().boxed());
312 } else {
313 *splits_on_fetch += batch.len();
314 let batch_reader = Self::build_batched_stream_reader(
315 column_ids,
316 source_ctx,
317 source_desc,
318 batch,
319 rate_limit_rps,
320 streaming_config,
321 )
322 .map_err(StreamExecutorError::connector_error);
323 stream.replace_data_stream(batch_reader);
324 }
325
326 Ok(())
327 }
328
329 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
330 async fn build_batched_stream_reader(
331 _column_ids: Vec<ColumnId>,
332 _source_ctx: SourceContext,
333 source_desc: SourceDesc,
334 batch: Vec<FileScanTask>,
335 _rate_limit_rps: Option<u32>,
336 streaming_config: Arc<StreamingConfig>,
337 ) {
338 let file_path_idx = source_desc
339 .columns
340 .iter()
341 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
342 .unwrap();
343 let file_pos_idx = source_desc
344 .columns
345 .iter()
346 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
347 .unwrap();
348 let properties = source_desc.source.config.clone();
349 let properties = match properties {
350 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
351 iceberg_properties
352 }
353 _ => unreachable!(),
354 };
355 let table = properties.load_table().await?;
356
357 for task in batch {
358 let mut chunks = vec![];
359 #[for_await]
360 for chunk in scan_task_to_chunk_with_deletes(
361 table.clone(),
362 task,
363 IcebergScanOpts {
364 chunk_size: streaming_config.developer.chunk_size,
365 need_seq_num: true, need_file_path_and_pos: true,
367 handle_delete_files: false,
368 },
369 None,
370 ) {
371 let chunk = chunk?;
372 chunks.push(StreamChunk::from_parts(
373 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
374 chunk,
375 ));
376 }
377 let last_chunk = chunks.last().unwrap();
379 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
380 let data_file_path = last_row
381 .datum_at(file_path_idx)
382 .unwrap()
383 .into_utf8()
384 .to_owned();
385 let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
386 yield ChunksWithState {
387 chunks,
388 data_file_path,
389 last_read_pos,
390 };
391 }
392 }
393
394 fn build_source_ctx(
395 &self,
396 source_desc: &SourceDesc,
397 source_id: SourceId,
398 source_name: &str,
399 ) -> SourceContext {
400 SourceContext::new(
401 self.actor_ctx.id,
402 source_id,
403 self.actor_ctx.fragment_id,
404 source_name.to_owned(),
405 source_desc.metrics.clone(),
406 SourceCtrlOpts {
407 chunk_size: limited_chunk_size(self.rate_limit_rps),
408 split_txn: self.rate_limit_rps.is_some(), },
410 source_desc.source.config.clone(),
411 None,
412 )
413 }
414
415 #[try_stream(ok = Message, error = StreamExecutorError)]
416 async fn into_stream(mut self) {
417 let mut upstream = self.upstream.take().unwrap().execute();
418 let barrier = expect_first_barrier(&mut upstream).await?;
419 let first_epoch = barrier.epoch;
420 let is_pause_on_startup = barrier.is_pause_on_startup();
421 yield Message::Barrier(barrier);
422
423 let mut core = self.stream_source_core.take().unwrap();
424 let mut state_store_handler = core.split_state_store;
425
426 let source_desc_builder = core.source_desc_builder.take().unwrap();
428
429 let source_desc = source_desc_builder
430 .build()
431 .map_err(StreamExecutorError::connector_error)?;
432
433 let file_path_idx = source_desc
434 .columns
435 .iter()
436 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
437 .unwrap();
438 let file_pos_idx = source_desc
439 .columns
440 .iter()
441 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
442 .unwrap();
443 let row_id_idx = source_desc
445 .columns
446 .iter()
447 .position(|c| c.name == ROW_ID_COLUMN_NAME)
448 .unwrap();
449 tracing::trace!(
450 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
451 source_desc.columns,
452 file_path_idx,
453 file_pos_idx,
454 row_id_idx
455 );
456 state_store_handler.init_epoch(first_epoch).await?;
458
459 let mut splits_on_fetch: usize = 0;
460 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
461 upstream,
462 stream::pending().boxed(),
463 );
464
465 if is_pause_on_startup {
466 stream.pause_stream();
467 }
468
469 Self::replace_with_new_batch_reader(
473 &mut splits_on_fetch,
474 &state_store_handler, core.column_ids.clone(),
476 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
477 source_desc.clone(),
478 &mut stream,
479 self.rate_limit_rps,
480 self.streaming_config.clone(),
481 )
482 .await?;
483
484 while let Some(msg) = stream.next().await {
485 match msg {
486 Err(e) => {
487 tracing::error!(error = %e.as_report(), "Fetch Error");
488 splits_on_fetch = 0;
489 }
490 Ok(msg) => {
491 match msg {
492 Either::Left(msg) => {
494 match msg {
495 Message::Barrier(barrier) => {
496 let mut need_rebuild_reader = false;
497
498 if let Some(mutation) = barrier.mutation.as_deref() {
499 match mutation {
500 Mutation::Pause => stream.pause_stream(),
501 Mutation::Resume => stream.resume_stream(),
502 Mutation::Throttle(actor_to_apply) => {
503 if let Some(new_rate_limit) =
504 actor_to_apply.get(&self.actor_ctx.id)
505 && *new_rate_limit != self.rate_limit_rps
506 {
507 tracing::debug!(
508 "updating rate limit from {:?} to {:?}",
509 self.rate_limit_rps,
510 *new_rate_limit
511 );
512 self.rate_limit_rps = *new_rate_limit;
513 need_rebuild_reader = true;
514 }
515 }
516 _ => (),
517 }
518 }
519
520 let post_commit = state_store_handler
521 .commit_may_update_vnode_bitmap(barrier.epoch)
522 .await?;
523
524 let update_vnode_bitmap =
525 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
526 yield Message::Barrier(barrier);
528
529 if let Some((_, cache_may_stale)) =
530 post_commit.post_yield_barrier(update_vnode_bitmap).await?
531 {
532 if cache_may_stale {
534 splits_on_fetch = 0;
535 }
536 }
537
538 if splits_on_fetch == 0 || need_rebuild_reader {
539 Self::replace_with_new_batch_reader(
540 &mut splits_on_fetch,
541 &state_store_handler,
542 core.column_ids.clone(),
543 self.build_source_ctx(
544 &source_desc,
545 core.source_id,
546 &core.source_name,
547 ),
548 source_desc.clone(),
549 &mut stream,
550 self.rate_limit_rps,
551 self.streaming_config.clone(),
552 )
553 .await?;
554 }
555 }
556 Message::Chunk(chunk) => {
559 let jsonb_values: Vec<(String, JsonbVal)> = chunk
560 .data_chunk()
561 .rows()
562 .map(|row| {
563 let file_name = row.datum_at(0).unwrap().into_utf8();
564 let split = row.datum_at(1).unwrap().into_jsonb();
565 (file_name.to_owned(), split.to_owned_scalar())
566 })
567 .collect();
568 state_store_handler.set_states_json(jsonb_values).await?;
569 state_store_handler.try_flush().await?;
570 }
571 Message::Watermark(_) => unreachable!(),
572 }
573 }
574 Either::Right(ChunksWithState {
576 chunks,
577 data_file_path,
578 last_read_pos: _,
579 }) => {
580 if true {
582 splits_on_fetch -= 1;
583 state_store_handler.delete(&data_file_path).await?;
584 }
585
586 for chunk in &chunks {
587 let chunk = prune_additional_cols(
588 chunk,
589 &[file_path_idx, file_pos_idx],
590 &source_desc.columns,
591 );
592 let (chunk, op) = chunk.into_parts();
594 let (mut columns, visibility) = chunk.into_parts();
595 columns.insert(
596 row_id_idx,
597 Arc::new(
598 SerialArray::from_iter_bitmap(
599 itertools::repeat_n(Serial::from(0), columns[0].len()),
600 Bitmap::ones(columns[0].len()),
601 )
602 .into(),
603 ),
604 );
605 let chunk = StreamChunk::from_parts(
606 op,
607 DataChunk::from_parts(columns.into(), visibility),
608 );
609
610 yield Message::Chunk(chunk);
611 }
612 }
613 }
614 }
615 }
616 }
617 }
618}
619
620impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
621 fn execute(self: Box<Self>) -> BoxedMessageStream {
622 self.into_stream().boxed()
623 }
624}
625
626impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
627 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628 if let Some(core) = &self.stream_source_core {
629 f.debug_struct("IcebergFetchExecutor")
630 .field("source_id", &core.source_id)
631 .field("column_ids", &core.column_ids)
632 .finish()
633 } else {
634 f.debug_struct("IcebergFetchExecutor").finish()
635 }
636 }
637}