1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_pb::common::ThrottleType;
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
39use crate::common::rate_limit::limited_chunk_size;
40use crate::executor::prelude::*;
41use crate::executor::stream_reader::StreamReaderWithPause;
42
43pub struct IcebergFetchExecutor<S: StateStore> {
49 actor_ctx: ActorContextRef,
50
51 stream_source_core: Option<StreamSourceCore<S>>,
53
54 upstream: Option<Executor>,
57
58 rate_limit_rps: Option<u32>,
60
61 streaming_config: Arc<StreamingConfig>,
63}
64
65pub(crate) struct ChunksWithState {
70 pub chunks: Vec<StreamChunk>,
72
73 pub data_file_path: String,
75
76 #[expect(dead_code)]
78 pub last_read_pos: Datum,
79}
80
81pub(super) use state::PersistedFileScanTask;
82mod state {
83 use std::sync::Arc;
84
85 use anyhow::Context;
86 use iceberg::expr::BoundPredicate;
87 use iceberg::scan::FileScanTask;
88 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
89 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
90 use serde::{Deserialize, Serialize};
91
92 use crate::executor::StreamExecutorResult;
93 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101 pub struct PersistedFileScanTask {
102 pub start: u64,
104 pub length: u64,
106 pub record_count: Option<u64>,
111
112 pub data_file_path: String,
114
115 pub data_file_content: DataContentType,
117
118 pub data_file_format: DataFileFormat,
120
121 pub schema: SchemaRef,
123 pub project_field_ids: Vec<i32>,
125 #[serde(skip_serializing_if = "Option::is_none")]
127 pub predicate: Option<BoundPredicate>,
128
129 pub deletes: Vec<PersistedFileScanTask>,
131 pub sequence_number: i64,
133 pub equality_ids: Option<Vec<i32>>,
135
136 pub file_size_in_bytes: u64,
137 }
138
139 impl PersistedFileScanTask {
140 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
142 let persisted_task: Self =
143 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
144 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
145 Ok(Self::to_task(persisted_task))
146 }
147
148 pub fn encode(task: FileScanTask) -> JsonbVal {
150 let persisted_task = Self::from_task(task);
151 serde_json::to_value(persisted_task).unwrap().into()
152 }
153
154 fn to_task(
156 Self {
157 start,
158 length,
159 record_count,
160 data_file_path,
161 data_file_content,
162 data_file_format,
163 schema,
164 project_field_ids,
165 predicate,
166 deletes,
167 sequence_number,
168 equality_ids,
169 file_size_in_bytes,
170 }: Self,
171 ) -> FileScanTask {
172 FileScanTask {
173 start,
174 length,
175 record_count,
176 data_file_path,
177 data_file_content,
178 data_file_format,
179 schema,
180 project_field_ids,
181 predicate,
182 deletes: deletes
183 .into_iter()
184 .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
185 .collect(),
186 sequence_number,
187 equality_ids,
188 file_size_in_bytes,
189 partition: None,
190 partition_spec: None,
191 name_mapping: None,
192 }
193 }
194
195 fn from_task(
197 FileScanTask {
198 start,
199 length,
200 record_count,
201 data_file_path,
202 data_file_content,
203 data_file_format,
204 schema,
205 project_field_ids,
206 predicate,
207 deletes,
208 sequence_number,
209 equality_ids,
210 file_size_in_bytes,
211 ..
212 }: FileScanTask,
213 ) -> Self {
214 Self {
215 start,
216 length,
217 record_count,
218 data_file_path,
219 data_file_content,
220 data_file_format,
221 schema,
222 project_field_ids,
223 predicate,
224 deletes: deletes
225 .into_iter()
226 .map(PersistedFileScanTask::from_task_ref)
227 .collect(),
228 sequence_number,
229 equality_ids,
230 file_size_in_bytes,
231 }
232 }
233
234 fn from_task_ref(task: Arc<FileScanTask>) -> Self {
235 Self {
236 start: task.start,
237 length: task.length,
238 record_count: task.record_count,
239 data_file_path: task.data_file_path.clone(),
240 data_file_content: task.data_file_content,
241 data_file_format: task.data_file_format,
242 schema: task.schema.clone(),
243 project_field_ids: task.project_field_ids.clone(),
244 predicate: task.predicate.clone(),
245 deletes: task
246 .deletes
247 .iter()
248 .cloned()
249 .map(PersistedFileScanTask::from_task_ref)
250 .collect(),
251 sequence_number: task.sequence_number,
252 equality_ids: task.equality_ids.clone(),
253 file_size_in_bytes: task.file_size_in_bytes,
254 }
255 }
256 }
257}
258
259impl<S: StateStore> IcebergFetchExecutor<S> {
260 pub fn new(
261 actor_ctx: ActorContextRef,
262 stream_source_core: StreamSourceCore<S>,
263 upstream: Executor,
264 rate_limit_rps: Option<u32>,
265 streaming_config: Arc<StreamingConfig>,
266 ) -> Self {
267 Self {
268 actor_ctx,
269 stream_source_core: Some(stream_source_core),
270 upstream: Some(upstream),
271 rate_limit_rps,
272 streaming_config,
273 }
274 }
275
276 #[expect(clippy::too_many_arguments)]
277 async fn replace_with_new_batch_reader<const BIASED: bool>(
278 splits_on_fetch: &mut usize,
279 state_store_handler: &SourceStateTableHandler<S>,
280 column_ids: Vec<ColumnId>,
281 source_ctx: SourceContext,
282 source_desc: SourceDesc,
283 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
284 rate_limit_rps: Option<u32>,
285 streaming_config: Arc<StreamingConfig>,
286 ) -> StreamExecutorResult<()> {
287 let mut batch =
288 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
289 let state_table = state_store_handler.state_table();
290 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
291 let table_iter = state_table
292 .iter_with_vnode(
293 vnode,
294 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
295 PrefetchOptions::prefetch_for_small_range_scan(),
297 )
298 .await?;
299 pin_mut!(table_iter);
300 while let Some(item) = table_iter.next().await {
301 let row = item?;
302 let task = match row.datum_at(1) {
303 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
304 PersistedFileScanTask::decode(jsonb_ref)?
305 }
306 _ => unreachable!(),
307 };
308 batch.push(task);
309
310 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
311 break 'vnodes;
312 }
313 }
314 }
315 if batch.is_empty() {
316 stream.replace_data_stream(stream::pending().boxed());
317 } else {
318 *splits_on_fetch += batch.len();
319 let batch_reader = Self::build_batched_stream_reader(
320 column_ids,
321 source_ctx,
322 source_desc,
323 batch,
324 rate_limit_rps,
325 streaming_config,
326 )
327 .map_err(StreamExecutorError::connector_error);
328 stream.replace_data_stream(batch_reader);
329 }
330
331 Ok(())
332 }
333
334 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
335 async fn build_batched_stream_reader(
336 _column_ids: Vec<ColumnId>,
337 _source_ctx: SourceContext,
338 source_desc: SourceDesc,
339 batch: Vec<FileScanTask>,
340 _rate_limit_rps: Option<u32>,
341 streaming_config: Arc<StreamingConfig>,
342 ) {
343 let file_path_idx = source_desc
344 .columns
345 .iter()
346 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
347 .unwrap();
348 let file_pos_idx = source_desc
349 .columns
350 .iter()
351 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
352 .unwrap();
353 let properties = source_desc.source.config.clone();
354 let properties = match properties {
355 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
356 iceberg_properties
357 }
358 _ => unreachable!(),
359 };
360 let table = properties.load_table().await?;
361
362 for task in batch {
363 let mut chunks = vec![];
364 #[for_await]
365 for chunk in scan_task_to_chunk_with_deletes(
366 table.clone(),
367 task,
368 IcebergScanOpts {
369 chunk_size: streaming_config.developer.chunk_size,
370 need_seq_num: true, need_file_path_and_pos: true,
372 handle_delete_files: table.metadata().format_version()
373 >= iceberg::spec::FormatVersion::V3,
374 },
375 None,
376 ) {
377 let chunk = chunk?;
378 chunks.push(StreamChunk::from_parts(
379 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
380 chunk,
381 ));
382 }
383 let last_chunk = chunks.last().unwrap();
385 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
386 let data_file_path = last_row
387 .datum_at(file_path_idx)
388 .unwrap()
389 .into_utf8()
390 .to_owned();
391 let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
392 yield ChunksWithState {
393 chunks,
394 data_file_path,
395 last_read_pos,
396 };
397 }
398 }
399
400 fn build_source_ctx(
401 &self,
402 source_desc: &SourceDesc,
403 source_id: SourceId,
404 source_name: &str,
405 ) -> SourceContext {
406 SourceContext::new(
407 self.actor_ctx.id,
408 source_id,
409 self.actor_ctx.fragment_id,
410 source_name.to_owned(),
411 source_desc.metrics.clone(),
412 SourceCtrlOpts {
413 chunk_size: limited_chunk_size(self.rate_limit_rps),
414 split_txn: self.rate_limit_rps.is_some(), },
416 source_desc.source.config.clone(),
417 None,
418 )
419 }
420
421 #[try_stream(ok = Message, error = StreamExecutorError)]
422 async fn into_stream(mut self) {
423 let mut upstream = self.upstream.take().unwrap().execute();
424 let barrier = expect_first_barrier(&mut upstream).await?;
425 let first_epoch = barrier.epoch;
426 let is_pause_on_startup = barrier.is_pause_on_startup();
427 yield Message::Barrier(barrier);
428
429 let mut core = self.stream_source_core.take().unwrap();
430 let mut state_store_handler = core.split_state_store;
431
432 let source_desc_builder = core.source_desc_builder.take().unwrap();
434
435 let source_desc = source_desc_builder
436 .build()
437 .map_err(StreamExecutorError::connector_error)?;
438
439 let file_path_idx = source_desc
440 .columns
441 .iter()
442 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
443 .unwrap();
444 let file_pos_idx = source_desc
445 .columns
446 .iter()
447 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
448 .unwrap();
449 let row_id_idx = source_desc
451 .columns
452 .iter()
453 .position(|c| c.name == ROW_ID_COLUMN_NAME)
454 .unwrap();
455 tracing::trace!(
456 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
457 source_desc.columns,
458 file_path_idx,
459 file_pos_idx,
460 row_id_idx
461 );
462 state_store_handler.init_epoch(first_epoch).await?;
464
465 let mut splits_on_fetch: usize = 0;
466 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
467 upstream,
468 stream::pending().boxed(),
469 );
470
471 if is_pause_on_startup {
472 stream.pause_stream();
473 }
474
475 Self::replace_with_new_batch_reader(
479 &mut splits_on_fetch,
480 &state_store_handler, core.column_ids.clone(),
482 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
483 source_desc.clone(),
484 &mut stream,
485 self.rate_limit_rps,
486 self.streaming_config.clone(),
487 )
488 .await?;
489
490 while let Some(msg) = stream.next().await {
491 match msg {
492 Err(e) => {
493 tracing::error!(error = %e.as_report(), "Fetch Error");
494 splits_on_fetch = 0;
495 }
496 Ok(msg) => {
497 match msg {
498 Either::Left(msg) => {
500 match msg {
501 Message::Barrier(barrier) => {
502 let mut need_rebuild_reader = false;
503
504 if let Some(mutation) = barrier.mutation.as_deref() {
505 match mutation {
506 Mutation::Pause => stream.pause_stream(),
507 Mutation::Resume => stream.resume_stream(),
508 Mutation::Throttle(fragment_to_apply) => {
509 if let Some(entry) = fragment_to_apply
510 .get(&self.actor_ctx.fragment_id)
511 && entry.throttle_type() == ThrottleType::Source
512 && entry.rate_limit != self.rate_limit_rps
513 {
514 tracing::debug!(
515 "updating rate limit from {:?} to {:?}",
516 self.rate_limit_rps,
517 entry.rate_limit
518 );
519 self.rate_limit_rps = entry.rate_limit;
520 need_rebuild_reader = true;
521 }
522 }
523 _ => (),
524 }
525 }
526
527 let post_commit = state_store_handler
528 .commit_may_update_vnode_bitmap(barrier.epoch)
529 .await?;
530
531 let update_vnode_bitmap =
532 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
533 yield Message::Barrier(barrier);
535
536 if post_commit
537 .post_yield_barrier(update_vnode_bitmap)
538 .await?
539 .is_some()
540 {
541 splits_on_fetch = 0;
545 }
546
547 if splits_on_fetch == 0 || need_rebuild_reader {
548 Self::replace_with_new_batch_reader(
549 &mut splits_on_fetch,
550 &state_store_handler,
551 core.column_ids.clone(),
552 self.build_source_ctx(
553 &source_desc,
554 core.source_id,
555 &core.source_name,
556 ),
557 source_desc.clone(),
558 &mut stream,
559 self.rate_limit_rps,
560 self.streaming_config.clone(),
561 )
562 .await?;
563 }
564 }
565 Message::Chunk(chunk) => {
568 let jsonb_values: Vec<(String, JsonbVal)> = chunk
569 .data_chunk()
570 .rows()
571 .map(|row| {
572 let file_name = row.datum_at(0).unwrap().into_utf8();
573 let split = row.datum_at(1).unwrap().into_jsonb();
574 (file_name.to_owned(), split.to_owned_scalar())
575 })
576 .collect();
577 state_store_handler.set_states_json(jsonb_values).await?;
578 state_store_handler.try_flush().await?;
579 }
580 Message::Watermark(_) => unreachable!(),
581 }
582 }
583 Either::Right(ChunksWithState {
585 chunks,
586 data_file_path,
587 last_read_pos: _,
588 }) => {
589 if true {
591 splits_on_fetch -= 1;
592 state_store_handler.delete(&data_file_path).await?;
593 }
594
595 for chunk in &chunks {
596 let chunk = prune_additional_cols(
597 chunk,
598 &[file_path_idx, file_pos_idx],
599 &source_desc.columns,
600 );
601 let (chunk, op) = chunk.into_parts();
603 let (mut columns, visibility) = chunk.into_parts();
604 columns.insert(
605 row_id_idx,
606 Arc::new(
607 SerialArray::from_iter_bitmap(
608 itertools::repeat_n(Serial::from(0), columns[0].len()),
609 Bitmap::zeros(columns[0].len()),
610 )
611 .into(),
612 ),
613 );
614 let chunk = StreamChunk::from_parts(
615 op,
616 DataChunk::from_parts(columns.into(), visibility),
617 );
618
619 yield Message::Chunk(chunk);
620 }
621 }
622 }
623 }
624 }
625 }
626 }
627}
628
629impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
630 fn execute(self: Box<Self>) -> BoxedMessageStream {
631 self.into_stream().boxed()
632 }
633}
634
635impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
636 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
637 if let Some(core) = &self.stream_source_core {
638 f.debug_struct("IcebergFetchExecutor")
639 .field("source_id", &core.source_id)
640 .field("column_ids", &core.column_ids)
641 .finish()
642 } else {
643 f.debug_struct("IcebergFetchExecutor").finish()
644 }
645 }
646}