1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_pb::common::ThrottleType;
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
39use crate::common::rate_limit::limited_chunk_size;
40use crate::executor::prelude::*;
41use crate::executor::stream_reader::StreamReaderWithPause;
42
43pub struct IcebergFetchExecutor<S: StateStore> {
49 actor_ctx: ActorContextRef,
50
51 stream_source_core: Option<StreamSourceCore<S>>,
53
54 upstream: Option<Executor>,
57
58 rate_limit_rps: Option<u32>,
60
61 streaming_config: Arc<StreamingConfig>,
63}
64
65pub(crate) struct ChunksWithState {
70 pub chunks: Vec<StreamChunk>,
72
73 pub data_file_path: String,
75
76 #[expect(dead_code)]
78 pub last_read_pos: Datum,
79}
80
81pub(super) use state::PersistedFileScanTask;
82mod state {
83 use std::sync::Arc;
84
85 use anyhow::Context;
86 use iceberg::expr::BoundPredicate;
87 use iceberg::scan::FileScanTask;
88 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
89 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
90 use serde::{Deserialize, Serialize};
91
92 use crate::executor::StreamExecutorResult;
93
94 fn default_case_sensitive() -> bool {
95 true
96 }
97 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
105 pub struct PersistedFileScanTask {
106 pub start: u64,
108 pub length: u64,
110 pub record_count: Option<u64>,
115
116 pub data_file_path: String,
118
119 pub data_file_content: DataContentType,
121
122 pub data_file_format: DataFileFormat,
124
125 pub schema: SchemaRef,
127 pub project_field_ids: Vec<i32>,
129 #[serde(skip_serializing_if = "Option::is_none")]
131 pub predicate: Option<BoundPredicate>,
132
133 pub deletes: Vec<PersistedFileScanTask>,
135 pub sequence_number: i64,
137 pub equality_ids: Option<Vec<i32>>,
139
140 pub file_size_in_bytes: u64,
141
142 #[serde(default = "default_case_sensitive")]
143 pub case_sensitive: bool,
144 }
145
146 impl PersistedFileScanTask {
147 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
149 let persisted_task: Self =
150 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
151 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
152 Ok(Self::to_task(persisted_task))
153 }
154
155 pub fn encode(task: FileScanTask) -> JsonbVal {
157 let persisted_task = Self::from_task(task);
158 serde_json::to_value(persisted_task).unwrap().into()
159 }
160
161 fn to_task(
163 Self {
164 start,
165 length,
166 record_count,
167 data_file_path,
168 data_file_content,
169 data_file_format,
170 schema,
171 project_field_ids,
172 predicate,
173 deletes,
174 sequence_number,
175 equality_ids,
176 file_size_in_bytes,
177 case_sensitive,
178 }: Self,
179 ) -> FileScanTask {
180 FileScanTask {
181 start,
182 length,
183 record_count,
184 data_file_path,
185 data_file_content,
186 data_file_format,
187 schema,
188 project_field_ids,
189 predicate,
190 deletes: deletes
191 .into_iter()
192 .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
193 .collect(),
194 sequence_number,
195 equality_ids,
196 file_size_in_bytes,
197 partition: None,
198 partition_spec: None,
199 name_mapping: None,
200 case_sensitive,
201 }
202 }
203
204 fn from_task(
206 FileScanTask {
207 start,
208 length,
209 record_count,
210 data_file_path,
211 data_file_content,
212 data_file_format,
213 schema,
214 project_field_ids,
215 predicate,
216 deletes,
217 sequence_number,
218 equality_ids,
219 file_size_in_bytes,
220 case_sensitive,
221 ..
222 }: FileScanTask,
223 ) -> Self {
224 Self {
225 start,
226 length,
227 record_count,
228 data_file_path,
229 data_file_content,
230 data_file_format,
231 schema,
232 project_field_ids,
233 predicate,
234 deletes: deletes
235 .into_iter()
236 .map(PersistedFileScanTask::from_task_ref)
237 .collect(),
238 sequence_number,
239 equality_ids,
240 file_size_in_bytes,
241 case_sensitive,
242 }
243 }
244
245 fn from_task_ref(task: Arc<FileScanTask>) -> Self {
246 Self {
247 start: task.start,
248 length: task.length,
249 record_count: task.record_count,
250 data_file_path: task.data_file_path.clone(),
251 data_file_content: task.data_file_content,
252 data_file_format: task.data_file_format,
253 schema: task.schema.clone(),
254 project_field_ids: task.project_field_ids.clone(),
255 predicate: task.predicate.clone(),
256 deletes: task
257 .deletes
258 .iter()
259 .cloned()
260 .map(PersistedFileScanTask::from_task_ref)
261 .collect(),
262 sequence_number: task.sequence_number,
263 equality_ids: task.equality_ids.clone(),
264 file_size_in_bytes: task.file_size_in_bytes,
265 case_sensitive: task.case_sensitive,
266 }
267 }
268 }
269}
270
271impl<S: StateStore> IcebergFetchExecutor<S> {
272 pub fn new(
273 actor_ctx: ActorContextRef,
274 stream_source_core: StreamSourceCore<S>,
275 upstream: Executor,
276 rate_limit_rps: Option<u32>,
277 streaming_config: Arc<StreamingConfig>,
278 ) -> Self {
279 Self {
280 actor_ctx,
281 stream_source_core: Some(stream_source_core),
282 upstream: Some(upstream),
283 rate_limit_rps,
284 streaming_config,
285 }
286 }
287
288 #[expect(clippy::too_many_arguments)]
289 async fn replace_with_new_batch_reader<const BIASED: bool>(
290 splits_on_fetch: &mut usize,
291 state_store_handler: &SourceStateTableHandler<S>,
292 column_ids: Vec<ColumnId>,
293 source_ctx: SourceContext,
294 source_desc: SourceDesc,
295 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
296 rate_limit_rps: Option<u32>,
297 streaming_config: Arc<StreamingConfig>,
298 ) -> StreamExecutorResult<()> {
299 let mut batch =
300 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
301 let state_table = state_store_handler.state_table();
302 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
303 let table_iter = state_table
304 .iter_with_vnode(
305 vnode,
306 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
307 PrefetchOptions::prefetch_for_small_range_scan(),
309 )
310 .await?;
311 pin_mut!(table_iter);
312 while let Some(item) = table_iter.next().await {
313 let row = item?;
314 let task = match row.datum_at(1) {
315 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
316 PersistedFileScanTask::decode(jsonb_ref)?
317 }
318 _ => unreachable!(),
319 };
320 batch.push(task);
321
322 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
323 break 'vnodes;
324 }
325 }
326 }
327 if batch.is_empty() {
328 stream.replace_data_stream(stream::pending().boxed());
329 } else {
330 *splits_on_fetch += batch.len();
331 let batch_reader = Self::build_batched_stream_reader(
332 column_ids,
333 source_ctx,
334 source_desc,
335 batch,
336 rate_limit_rps,
337 streaming_config,
338 )
339 .map_err(StreamExecutorError::connector_error);
340 stream.replace_data_stream(batch_reader);
341 }
342
343 Ok(())
344 }
345
346 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
347 async fn build_batched_stream_reader(
348 _column_ids: Vec<ColumnId>,
349 _source_ctx: SourceContext,
350 source_desc: SourceDesc,
351 batch: Vec<FileScanTask>,
352 _rate_limit_rps: Option<u32>,
353 streaming_config: Arc<StreamingConfig>,
354 ) {
355 let file_path_idx = source_desc
356 .columns
357 .iter()
358 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
359 .unwrap();
360 let file_pos_idx = source_desc
361 .columns
362 .iter()
363 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
364 .unwrap();
365 let properties = source_desc.source.config.clone();
366 let properties = match properties {
367 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
368 iceberg_properties
369 }
370 _ => unreachable!(),
371 };
372 let table = properties.load_table().await?;
373
374 for task in batch {
375 let mut chunks = vec![];
376 #[for_await]
377 for chunk in scan_task_to_chunk_with_deletes(
378 table.clone(),
379 task,
380 IcebergScanOpts {
381 chunk_size: streaming_config.developer.chunk_size,
382 need_seq_num: true, need_file_path_and_pos: true,
384 handle_delete_files: table.metadata().format_version()
385 >= iceberg::spec::FormatVersion::V3,
386 },
387 None,
388 ) {
389 let chunk = chunk?;
390 chunks.push(StreamChunk::from_parts(
391 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
392 chunk,
393 ));
394 }
395 let last_chunk = chunks.last().unwrap();
397 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
398 let data_file_path = last_row
399 .datum_at(file_path_idx)
400 .unwrap()
401 .into_utf8()
402 .to_owned();
403 let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
404 yield ChunksWithState {
405 chunks,
406 data_file_path,
407 last_read_pos,
408 };
409 }
410 }
411
412 fn build_source_ctx(
413 &self,
414 source_desc: &SourceDesc,
415 source_id: SourceId,
416 source_name: &str,
417 ) -> SourceContext {
418 SourceContext::new(
419 self.actor_ctx.id,
420 source_id,
421 self.actor_ctx.fragment_id,
422 source_name.to_owned(),
423 source_desc.metrics.clone(),
424 SourceCtrlOpts {
425 chunk_size: limited_chunk_size(self.rate_limit_rps),
426 split_txn: self.rate_limit_rps.is_some(), },
428 source_desc.source.config.clone(),
429 None,
430 )
431 }
432
433 #[try_stream(ok = Message, error = StreamExecutorError)]
434 async fn into_stream(mut self) {
435 let mut upstream = self.upstream.take().unwrap().execute();
436 let barrier = expect_first_barrier(&mut upstream).await?;
437 let first_epoch = barrier.epoch;
438 let is_pause_on_startup = barrier.is_pause_on_startup();
439 yield Message::Barrier(barrier);
440
441 let mut core = self.stream_source_core.take().unwrap();
442 let mut state_store_handler = core.split_state_store;
443
444 let source_desc_builder = core.source_desc_builder.take().unwrap();
446
447 let source_desc = source_desc_builder
448 .build()
449 .map_err(StreamExecutorError::connector_error)?;
450
451 let file_path_idx = source_desc
452 .columns
453 .iter()
454 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
455 .unwrap();
456 let file_pos_idx = source_desc
457 .columns
458 .iter()
459 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
460 .unwrap();
461 let row_id_idx = source_desc
463 .columns
464 .iter()
465 .position(|c| c.name == ROW_ID_COLUMN_NAME)
466 .unwrap();
467 tracing::trace!(
468 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
469 source_desc.columns,
470 file_path_idx,
471 file_pos_idx,
472 row_id_idx
473 );
474 state_store_handler.init_epoch(first_epoch).await?;
476
477 let mut splits_on_fetch: usize = 0;
478 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
479 upstream,
480 stream::pending().boxed(),
481 );
482
483 if is_pause_on_startup {
484 stream.pause_stream();
485 }
486
487 Self::replace_with_new_batch_reader(
491 &mut splits_on_fetch,
492 &state_store_handler, core.column_ids.clone(),
494 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
495 source_desc.clone(),
496 &mut stream,
497 self.rate_limit_rps,
498 self.streaming_config.clone(),
499 )
500 .await?;
501
502 while let Some(msg) = stream.next().await {
503 match msg {
504 Err(e) => {
505 tracing::error!(error = %e.as_report(), "Fetch Error");
506 splits_on_fetch = 0;
507 }
508 Ok(msg) => {
509 match msg {
510 Either::Left(msg) => {
512 match msg {
513 Message::Barrier(barrier) => {
514 let mut need_rebuild_reader = false;
515
516 if let Some(mutation) = barrier.mutation.as_deref() {
517 match mutation {
518 Mutation::Pause => stream.pause_stream(),
519 Mutation::Resume => stream.resume_stream(),
520 Mutation::Throttle(fragment_to_apply) => {
521 if let Some(entry) = fragment_to_apply
522 .get(&self.actor_ctx.fragment_id)
523 && entry.throttle_type() == ThrottleType::Source
524 && entry.rate_limit != self.rate_limit_rps
525 {
526 tracing::debug!(
527 "updating rate limit from {:?} to {:?}",
528 self.rate_limit_rps,
529 entry.rate_limit
530 );
531 self.rate_limit_rps = entry.rate_limit;
532 need_rebuild_reader = true;
533 }
534 }
535 _ => (),
536 }
537 }
538
539 let post_commit = state_store_handler
540 .commit_may_update_vnode_bitmap(barrier.epoch)
541 .await?;
542
543 let update_vnode_bitmap =
544 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
545 yield Message::Barrier(barrier);
547
548 if post_commit
549 .post_yield_barrier(update_vnode_bitmap)
550 .await?
551 .is_some()
552 {
553 splits_on_fetch = 0;
557 }
558
559 if splits_on_fetch == 0 || need_rebuild_reader {
560 Self::replace_with_new_batch_reader(
561 &mut splits_on_fetch,
562 &state_store_handler,
563 core.column_ids.clone(),
564 self.build_source_ctx(
565 &source_desc,
566 core.source_id,
567 &core.source_name,
568 ),
569 source_desc.clone(),
570 &mut stream,
571 self.rate_limit_rps,
572 self.streaming_config.clone(),
573 )
574 .await?;
575 }
576 }
577 Message::Chunk(chunk) => {
580 let jsonb_values: Vec<(String, JsonbVal)> = chunk
581 .data_chunk()
582 .rows()
583 .map(|row| {
584 let file_name = row.datum_at(0).unwrap().into_utf8();
585 let split = row.datum_at(1).unwrap().into_jsonb();
586 (file_name.to_owned(), split.to_owned_scalar())
587 })
588 .collect();
589 state_store_handler.set_states_json(jsonb_values).await?;
590 state_store_handler.try_flush().await?;
591 }
592 Message::Watermark(_) => unreachable!(),
593 }
594 }
595 Either::Right(ChunksWithState {
597 chunks,
598 data_file_path,
599 last_read_pos: _,
600 }) => {
601 if true {
603 splits_on_fetch -= 1;
604 state_store_handler.delete(&data_file_path).await?;
605 }
606
607 for chunk in &chunks {
608 let chunk = prune_additional_cols(
609 chunk,
610 &[file_path_idx, file_pos_idx],
611 &source_desc.columns,
612 );
613 let (chunk, op) = chunk.into_parts();
615 let (mut columns, visibility) = chunk.into_parts();
616 columns.insert(
617 row_id_idx,
618 Arc::new(
619 SerialArray::from_iter_bitmap(
620 itertools::repeat_n(Serial::from(0), columns[0].len()),
621 Bitmap::zeros(columns[0].len()),
622 )
623 .into(),
624 ),
625 );
626 let chunk = StreamChunk::from_parts(
627 op,
628 DataChunk::from_parts(columns.into(), visibility),
629 );
630
631 yield Message::Chunk(chunk);
632 }
633 }
634 }
635 }
636 }
637 }
638 }
639}
640
641impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
642 fn execute(self: Box<Self>) -> BoxedMessageStream {
643 self.into_stream().boxed()
644 }
645}
646
647impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
648 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
649 if let Some(core) = &self.stream_source_core {
650 f.debug_struct("IcebergFetchExecutor")
651 .field("source_id", &core.source_id)
652 .field("column_ids", &core.column_ids)
653 .finish()
654 } else {
655 f.debug_struct("IcebergFetchExecutor").finish()
656 }
657 }
658}