1use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25 ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26 TableId,
27};
28use risingwave_common::config::StreamingConfig;
29use risingwave_common::hash::VnodeBitmapExt;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42pub struct IcebergFetchExecutor<S: StateStore> {
48 actor_ctx: ActorContextRef,
49
50 stream_source_core: Option<StreamSourceCore<S>>,
52
53 upstream: Option<Executor>,
56
57 rate_limit_rps: Option<u32>,
59
60 streaming_config: Arc<StreamingConfig>,
62}
63
64struct ChunksWithState {
69 chunks: Vec<StreamChunk>,
71
72 data_file_path: String,
74
75 #[expect(dead_code)]
77 last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82 use anyhow::Context;
83 use iceberg::expr::BoundPredicate;
84 use iceberg::scan::FileScanTask;
85 use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
86 use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
87 use serde::{Deserialize, Serialize};
88
89 use crate::executor::StreamExecutorResult;
90 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98 pub struct PersistedFileScanTask {
99 pub start: u64,
101 pub length: u64,
103 pub record_count: Option<u64>,
108
109 pub data_file_path: String,
111
112 pub data_file_content: DataContentType,
114
115 pub data_file_format: DataFileFormat,
117
118 pub schema: SchemaRef,
120 pub project_field_ids: Vec<i32>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub predicate: Option<BoundPredicate>,
125
126 pub deletes: Vec<PersistedFileScanTask>,
128 pub sequence_number: i64,
130 pub equality_ids: Vec<i32>,
132 }
133
134 impl PersistedFileScanTask {
135 pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
137 let persisted_task: Self =
138 serde_json::from_value(jsonb_ref.to_owned_scalar().take())
139 .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
140 Ok(Self::to_task(persisted_task))
141 }
142
143 pub fn encode(task: FileScanTask) -> JsonbVal {
145 let persisted_task = Self::from_task(task);
146 serde_json::to_value(persisted_task).unwrap().into()
147 }
148
149 fn to_task(
151 Self {
152 start,
153 length,
154 record_count,
155 data_file_path,
156 data_file_content,
157 data_file_format,
158 schema,
159 project_field_ids,
160 predicate,
161 deletes,
162 sequence_number,
163 equality_ids,
164 }: Self,
165 ) -> FileScanTask {
166 FileScanTask {
167 start,
168 length,
169 record_count,
170 data_file_path,
171 data_file_content,
172 data_file_format,
173 schema,
174 project_field_ids,
175 predicate,
176 deletes: deletes
177 .into_iter()
178 .map(PersistedFileScanTask::to_task)
179 .collect(),
180 sequence_number,
181 equality_ids,
182 }
183 }
184
185 fn from_task(
187 FileScanTask {
188 start,
189 length,
190 record_count,
191 data_file_path,
192 data_file_content,
193 data_file_format,
194 schema,
195 project_field_ids,
196 predicate,
197 deletes,
198 sequence_number,
199 equality_ids,
200 }: FileScanTask,
201 ) -> Self {
202 Self {
203 start,
204 length,
205 record_count,
206 data_file_path,
207 data_file_content,
208 data_file_format,
209 schema,
210 project_field_ids,
211 predicate,
212 deletes: deletes
213 .into_iter()
214 .map(PersistedFileScanTask::from_task)
215 .collect(),
216 sequence_number,
217 equality_ids,
218 }
219 }
220 }
221}
222
223impl<S: StateStore> IcebergFetchExecutor<S> {
224 pub fn new(
225 actor_ctx: ActorContextRef,
226 stream_source_core: StreamSourceCore<S>,
227 upstream: Executor,
228 rate_limit_rps: Option<u32>,
229 streaming_config: Arc<StreamingConfig>,
230 ) -> Self {
231 Self {
232 actor_ctx,
233 stream_source_core: Some(stream_source_core),
234 upstream: Some(upstream),
235 rate_limit_rps,
236 streaming_config,
237 }
238 }
239
240 #[expect(clippy::too_many_arguments)]
241 async fn replace_with_new_batch_reader<const BIASED: bool>(
242 splits_on_fetch: &mut usize,
243 state_store_handler: &SourceStateTableHandler<S>,
244 column_ids: Vec<ColumnId>,
245 source_ctx: SourceContext,
246 source_desc: SourceDesc,
247 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
248 rate_limit_rps: Option<u32>,
249 streaming_config: Arc<StreamingConfig>,
250 ) -> StreamExecutorResult<()> {
251 let mut batch =
252 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
253 let state_table = state_store_handler.state_table();
254 'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
255 let table_iter = state_table
256 .iter_with_vnode(
257 vnode,
258 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
259 PrefetchOptions::prefetch_for_small_range_scan(),
261 )
262 .await?;
263 pin_mut!(table_iter);
264 while let Some(item) = table_iter.next().await {
265 let row = item?;
266 let task = match row.datum_at(1) {
267 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
268 PersistedFileScanTask::decode(jsonb_ref)?
269 }
270 _ => unreachable!(),
271 };
272 batch.push(task);
273
274 if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
275 break 'vnodes;
276 }
277 }
278 }
279 if batch.is_empty() {
280 stream.replace_data_stream(stream::pending().boxed());
281 } else {
282 *splits_on_fetch += batch.len();
283 let batch_reader = Self::build_batched_stream_reader(
284 column_ids,
285 source_ctx,
286 source_desc,
287 batch,
288 rate_limit_rps,
289 streaming_config,
290 )
291 .map_err(StreamExecutorError::connector_error);
292 stream.replace_data_stream(batch_reader);
293 }
294
295 Ok(())
296 }
297
298 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
299 async fn build_batched_stream_reader(
300 _column_ids: Vec<ColumnId>,
301 _source_ctx: SourceContext,
302 source_desc: SourceDesc,
303 batch: Vec<FileScanTask>,
304 _rate_limit_rps: Option<u32>,
305 streaming_config: Arc<StreamingConfig>,
306 ) {
307 let file_path_idx = source_desc
308 .columns
309 .iter()
310 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
311 .unwrap();
312 let file_pos_idx = source_desc
313 .columns
314 .iter()
315 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
316 .unwrap();
317 let properties = source_desc.source.config.clone();
318 let properties = match properties {
319 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
320 iceberg_properties
321 }
322 _ => unreachable!(),
323 };
324 let table = properties.load_table().await?;
325
326 for task in batch {
327 let mut chunks = vec![];
328 #[for_await]
329 for chunk in scan_task_to_chunk(
330 table.clone(),
331 task,
332 IcebergScanOpts {
333 chunk_size: streaming_config.developer.chunk_size,
334 need_seq_num: true, need_file_path_and_pos: true,
336 },
337 None,
338 ) {
339 let chunk = chunk?;
340 chunks.push(StreamChunk::from_parts(
341 itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
342 chunk,
343 ));
344 }
345 let last_chunk = chunks.last().unwrap();
347 let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
348 let data_file_path = last_row
349 .datum_at(file_path_idx)
350 .unwrap()
351 .into_utf8()
352 .to_owned();
353 let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
354 yield ChunksWithState {
355 chunks,
356 data_file_path,
357 last_read_pos,
358 };
359 }
360 }
361
362 fn build_source_ctx(
363 &self,
364 source_desc: &SourceDesc,
365 source_id: TableId,
366 source_name: &str,
367 ) -> SourceContext {
368 SourceContext::new(
369 self.actor_ctx.id,
370 source_id,
371 self.actor_ctx.fragment_id,
372 source_name.to_owned(),
373 source_desc.metrics.clone(),
374 SourceCtrlOpts {
375 chunk_size: limited_chunk_size(self.rate_limit_rps),
376 split_txn: self.rate_limit_rps.is_some(), },
378 source_desc.source.config.clone(),
379 None,
380 )
381 }
382
383 #[try_stream(ok = Message, error = StreamExecutorError)]
384 async fn into_stream(mut self) {
385 let mut upstream = self.upstream.take().unwrap().execute();
386 let barrier = expect_first_barrier(&mut upstream).await?;
387 let first_epoch = barrier.epoch;
388 let is_pause_on_startup = barrier.is_pause_on_startup();
389 yield Message::Barrier(barrier);
390
391 let mut core = self.stream_source_core.take().unwrap();
392 let mut state_store_handler = core.split_state_store;
393
394 let source_desc_builder = core.source_desc_builder.take().unwrap();
396
397 let source_desc = source_desc_builder
398 .build()
399 .map_err(StreamExecutorError::connector_error)?;
400
401 let file_path_idx = source_desc
402 .columns
403 .iter()
404 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
405 .unwrap();
406 let file_pos_idx = source_desc
407 .columns
408 .iter()
409 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
410 .unwrap();
411 let row_id_idx = source_desc
413 .columns
414 .iter()
415 .position(|c| c.name == ROW_ID_COLUMN_NAME)
416 .unwrap();
417 tracing::trace!(
418 "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
419 source_desc.columns,
420 file_path_idx,
421 file_pos_idx,
422 row_id_idx
423 );
424 state_store_handler.init_epoch(first_epoch).await?;
426
427 let mut splits_on_fetch: usize = 0;
428 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
429 upstream,
430 stream::pending().boxed(),
431 );
432
433 if is_pause_on_startup {
434 stream.pause_stream();
435 }
436
437 Self::replace_with_new_batch_reader(
441 &mut splits_on_fetch,
442 &state_store_handler, core.column_ids.clone(),
444 self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
445 source_desc.clone(),
446 &mut stream,
447 self.rate_limit_rps,
448 self.streaming_config.clone(),
449 )
450 .await?;
451
452 while let Some(msg) = stream.next().await {
453 match msg {
454 Err(e) => {
455 tracing::error!(error = %e.as_report(), "Fetch Error");
456 splits_on_fetch = 0;
457 }
458 Ok(msg) => {
459 match msg {
460 Either::Left(msg) => {
462 match msg {
463 Message::Barrier(barrier) => {
464 let mut need_rebuild_reader = false;
465
466 if let Some(mutation) = barrier.mutation.as_deref() {
467 match mutation {
468 Mutation::Pause => stream.pause_stream(),
469 Mutation::Resume => stream.resume_stream(),
470 Mutation::Throttle(actor_to_apply) => {
471 if let Some(new_rate_limit) =
472 actor_to_apply.get(&self.actor_ctx.id)
473 && *new_rate_limit != self.rate_limit_rps
474 {
475 tracing::debug!(
476 "updating rate limit from {:?} to {:?}",
477 self.rate_limit_rps,
478 *new_rate_limit
479 );
480 self.rate_limit_rps = *new_rate_limit;
481 need_rebuild_reader = true;
482 }
483 }
484 _ => (),
485 }
486 }
487
488 let post_commit = state_store_handler
489 .commit_may_update_vnode_bitmap(barrier.epoch)
490 .await?;
491
492 let update_vnode_bitmap =
493 barrier.as_update_vnode_bitmap(self.actor_ctx.id);
494 yield Message::Barrier(barrier);
496
497 if let Some((_, cache_may_stale)) =
498 post_commit.post_yield_barrier(update_vnode_bitmap).await?
499 {
500 if cache_may_stale {
502 splits_on_fetch = 0;
503 }
504 }
505
506 if splits_on_fetch == 0 || need_rebuild_reader {
507 Self::replace_with_new_batch_reader(
508 &mut splits_on_fetch,
509 &state_store_handler,
510 core.column_ids.clone(),
511 self.build_source_ctx(
512 &source_desc,
513 core.source_id,
514 &core.source_name,
515 ),
516 source_desc.clone(),
517 &mut stream,
518 self.rate_limit_rps,
519 self.streaming_config.clone(),
520 )
521 .await?;
522 }
523 }
524 Message::Chunk(chunk) => {
527 let jsonb_values: Vec<(String, JsonbVal)> = chunk
528 .data_chunk()
529 .rows()
530 .map(|row| {
531 let file_name = row.datum_at(0).unwrap().into_utf8();
532 let split = row.datum_at(1).unwrap().into_jsonb();
533 (file_name.to_owned(), split.to_owned_scalar())
534 })
535 .collect();
536 state_store_handler.set_states_json(jsonb_values).await?;
537 state_store_handler.try_flush().await?;
538 }
539 Message::Watermark(_) => unreachable!(),
540 }
541 }
542 Either::Right(ChunksWithState {
544 chunks,
545 data_file_path,
546 last_read_pos: _,
547 }) => {
548 if true {
550 splits_on_fetch -= 1;
551 state_store_handler.delete(&data_file_path).await?;
552 }
553
554 for chunk in &chunks {
555 let chunk = prune_additional_cols(
556 chunk,
557 file_path_idx,
558 file_pos_idx,
559 &source_desc.columns,
560 );
561 let (chunk, op) = chunk.into_parts();
563 let (mut columns, visibility) = chunk.into_parts();
564 columns.insert(
565 row_id_idx,
566 Arc::new(
567 SerialArray::from_iter_bitmap(
568 itertools::repeat_n(Serial::from(0), columns[0].len()),
569 Bitmap::ones(columns[0].len()),
570 )
571 .into(),
572 ),
573 );
574 let chunk = StreamChunk::from_parts(
575 op,
576 DataChunk::from_parts(columns.into(), visibility),
577 );
578
579 yield Message::Chunk(chunk);
580 }
581 }
582 }
583 }
584 }
585 }
586 }
587}
588
589impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
590 fn execute(self: Box<Self>) -> BoxedMessageStream {
591 self.into_stream().boxed()
592 }
593}
594
595impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
596 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
597 if let Some(core) = &self.stream_source_core {
598 f.debug_struct("IcebergFetchExecutor")
599 .field("source_id", &core.source_id)
600 .field("column_ids", &core.column_ids)
601 .finish()
602 } else {
603 f.debug_struct("IcebergFetchExecutor").finish()
604 }
605 }
606}