risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_pb::common::ThrottleType;
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
39use crate::common::rate_limit::limited_chunk_size;
40use crate::executor::prelude::*;
41use crate::executor::stream_reader::StreamReaderWithPause;
42
43/// An executor that fetches data from Iceberg tables.
44///
45/// This executor works with an upstream list executor that provides the list of files to read.
46/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
47/// downstream.
48pub struct IcebergFetchExecutor<S: StateStore> {
49    actor_ctx: ActorContextRef,
50
51    /// Core component for managing external streaming source state
52    stream_source_core: Option<StreamSourceCore<S>>,
53
54    /// Upstream list executor that provides the list of files to read.
55    /// This executor is responsible for discovering new files and changes in the Iceberg table.
56    upstream: Option<Executor>,
57
58    /// Optional rate limit in rows/s to control data ingestion speed
59    rate_limit_rps: Option<u32>,
60
61    /// Configuration for streaming operations, including Iceberg-specific settings
62    streaming_config: Arc<StreamingConfig>,
63}
64
65/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
66///
67/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
68/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
69pub(crate) struct ChunksWithState {
70    /// The actual data chunks read from the file
71    pub chunks: Vec<StreamChunk>,
72
73    /// Path to the data file, used for checkpointing and error reporting.
74    pub data_file_path: String,
75
76    /// The last read position in the file, used for checkpointing.
77    #[expect(dead_code)]
78    pub last_read_pos: Datum,
79}
80
81pub(super) use state::PersistedFileScanTask;
82mod state {
83    use std::sync::Arc;
84
85    use anyhow::Context;
86    use iceberg::expr::BoundPredicate;
87    use iceberg::scan::FileScanTask;
88    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
89    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
90    use serde::{Deserialize, Serialize};
91
92    use crate::executor::StreamExecutorResult;
93    /// This corresponds to the actually persisted `FileScanTask` in the state table.
94    ///
95    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
96    /// Currently, they have the same definition.
97    ///
98    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
99    /// A version id needs to be introduced then.
100    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101    pub struct PersistedFileScanTask {
102        /// The start offset of the file to scan.
103        pub start: u64,
104        /// The length of the file to scan.
105        pub length: u64,
106        /// The number of records in the file to scan.
107        ///
108        /// This is an optional field, and only available if we are
109        /// reading the entire data file.
110        pub record_count: Option<u64>,
111
112        /// The data file path corresponding to the task.
113        pub data_file_path: String,
114
115        /// The content type of the file to scan.
116        pub data_file_content: DataContentType,
117
118        /// The format of the file to scan.
119        pub data_file_format: DataFileFormat,
120
121        /// The schema of the file to scan.
122        pub schema: SchemaRef,
123        /// The field ids to project.
124        pub project_field_ids: Vec<i32>,
125        /// The predicate to filter.
126        #[serde(skip_serializing_if = "Option::is_none")]
127        pub predicate: Option<BoundPredicate>,
128
129        /// The list of delete files that may need to be applied to this data file
130        pub deletes: Vec<PersistedFileScanTask>,
131        /// sequence number
132        pub sequence_number: i64,
133        /// equality ids
134        pub equality_ids: Option<Vec<i32>>,
135
136        pub file_size_in_bytes: u64,
137    }
138
139    impl PersistedFileScanTask {
140        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
141        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
142            let persisted_task: Self =
143                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
144                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
145            Ok(Self::to_task(persisted_task))
146        }
147
148        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
149        pub fn encode(task: FileScanTask) -> JsonbVal {
150            let persisted_task = Self::from_task(task);
151            serde_json::to_value(persisted_task).unwrap().into()
152        }
153
154        /// Converts a persisted task to a [`FileScanTask`].
155        fn to_task(
156            Self {
157                start,
158                length,
159                record_count,
160                data_file_path,
161                data_file_content,
162                data_file_format,
163                schema,
164                project_field_ids,
165                predicate,
166                deletes,
167                sequence_number,
168                equality_ids,
169                file_size_in_bytes,
170            }: Self,
171        ) -> FileScanTask {
172            FileScanTask {
173                start,
174                length,
175                record_count,
176                data_file_path,
177                data_file_content,
178                data_file_format,
179                schema,
180                project_field_ids,
181                predicate,
182                deletes: deletes
183                    .into_iter()
184                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
185                    .collect(),
186                sequence_number,
187                equality_ids,
188                file_size_in_bytes,
189                partition: None,
190                partition_spec: None,
191                name_mapping: None,
192            }
193        }
194
195        /// Changes a [`FileScanTask`] to a persisted one.
196        fn from_task(
197            FileScanTask {
198                start,
199                length,
200                record_count,
201                data_file_path,
202                data_file_content,
203                data_file_format,
204                schema,
205                project_field_ids,
206                predicate,
207                deletes,
208                sequence_number,
209                equality_ids,
210                file_size_in_bytes,
211                ..
212            }: FileScanTask,
213        ) -> Self {
214            Self {
215                start,
216                length,
217                record_count,
218                data_file_path,
219                data_file_content,
220                data_file_format,
221                schema,
222                project_field_ids,
223                predicate,
224                deletes: deletes
225                    .into_iter()
226                    .map(PersistedFileScanTask::from_task_ref)
227                    .collect(),
228                sequence_number,
229                equality_ids,
230                file_size_in_bytes,
231            }
232        }
233
234        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
235            Self {
236                start: task.start,
237                length: task.length,
238                record_count: task.record_count,
239                data_file_path: task.data_file_path.clone(),
240                data_file_content: task.data_file_content,
241                data_file_format: task.data_file_format,
242                schema: task.schema.clone(),
243                project_field_ids: task.project_field_ids.clone(),
244                predicate: task.predicate.clone(),
245                deletes: task
246                    .deletes
247                    .iter()
248                    .cloned()
249                    .map(PersistedFileScanTask::from_task_ref)
250                    .collect(),
251                sequence_number: task.sequence_number,
252                equality_ids: task.equality_ids.clone(),
253                file_size_in_bytes: task.file_size_in_bytes,
254            }
255        }
256    }
257}
258
259impl<S: StateStore> IcebergFetchExecutor<S> {
260    pub fn new(
261        actor_ctx: ActorContextRef,
262        stream_source_core: StreamSourceCore<S>,
263        upstream: Executor,
264        rate_limit_rps: Option<u32>,
265        streaming_config: Arc<StreamingConfig>,
266    ) -> Self {
267        Self {
268            actor_ctx,
269            stream_source_core: Some(stream_source_core),
270            upstream: Some(upstream),
271            rate_limit_rps,
272            streaming_config,
273        }
274    }
275
276    #[expect(clippy::too_many_arguments)]
277    async fn replace_with_new_batch_reader<const BIASED: bool>(
278        splits_on_fetch: &mut usize,
279        state_store_handler: &SourceStateTableHandler<S>,
280        column_ids: Vec<ColumnId>,
281        source_ctx: SourceContext,
282        source_desc: SourceDesc,
283        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
284        rate_limit_rps: Option<u32>,
285        streaming_config: Arc<StreamingConfig>,
286    ) -> StreamExecutorResult<()> {
287        let mut batch =
288            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
289        let state_table = state_store_handler.state_table();
290        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
291            let table_iter = state_table
292                .iter_with_vnode(
293                    vnode,
294                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
295                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
296                    PrefetchOptions::prefetch_for_small_range_scan(),
297                )
298                .await?;
299            pin_mut!(table_iter);
300            while let Some(item) = table_iter.next().await {
301                let row = item?;
302                let task = match row.datum_at(1) {
303                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
304                        PersistedFileScanTask::decode(jsonb_ref)?
305                    }
306                    _ => unreachable!(),
307                };
308                batch.push(task);
309
310                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
311                    break 'vnodes;
312                }
313            }
314        }
315        if batch.is_empty() {
316            stream.replace_data_stream(stream::pending().boxed());
317        } else {
318            *splits_on_fetch += batch.len();
319            let batch_reader = Self::build_batched_stream_reader(
320                column_ids,
321                source_ctx,
322                source_desc,
323                batch,
324                rate_limit_rps,
325                streaming_config,
326            )
327            .map_err(StreamExecutorError::connector_error);
328            stream.replace_data_stream(batch_reader);
329        }
330
331        Ok(())
332    }
333
334    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
335    async fn build_batched_stream_reader(
336        _column_ids: Vec<ColumnId>,
337        _source_ctx: SourceContext,
338        source_desc: SourceDesc,
339        batch: Vec<FileScanTask>,
340        _rate_limit_rps: Option<u32>,
341        streaming_config: Arc<StreamingConfig>,
342    ) {
343        let file_path_idx = source_desc
344            .columns
345            .iter()
346            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
347            .unwrap();
348        let file_pos_idx = source_desc
349            .columns
350            .iter()
351            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
352            .unwrap();
353        let properties = source_desc.source.config.clone();
354        let properties = match properties {
355            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
356                iceberg_properties
357            }
358            _ => unreachable!(),
359        };
360        let table = properties.load_table().await?;
361
362        for task in batch {
363            let mut chunks = vec![];
364            #[for_await]
365            for chunk in scan_task_to_chunk_with_deletes(
366                table.clone(),
367                task,
368                IcebergScanOpts {
369                    chunk_size: streaming_config.developer.chunk_size,
370                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
371                    need_file_path_and_pos: true,
372                    handle_delete_files: false,
373                },
374                None,
375            ) {
376                let chunk = chunk?;
377                chunks.push(StreamChunk::from_parts(
378                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
379                    chunk,
380                ));
381            }
382            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
383            let last_chunk = chunks.last().unwrap();
384            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
385            let data_file_path = last_row
386                .datum_at(file_path_idx)
387                .unwrap()
388                .into_utf8()
389                .to_owned();
390            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
391            yield ChunksWithState {
392                chunks,
393                data_file_path,
394                last_read_pos,
395            };
396        }
397    }
398
399    fn build_source_ctx(
400        &self,
401        source_desc: &SourceDesc,
402        source_id: SourceId,
403        source_name: &str,
404    ) -> SourceContext {
405        SourceContext::new(
406            self.actor_ctx.id,
407            source_id,
408            self.actor_ctx.fragment_id,
409            source_name.to_owned(),
410            source_desc.metrics.clone(),
411            SourceCtrlOpts {
412                chunk_size: limited_chunk_size(self.rate_limit_rps),
413                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
414            },
415            source_desc.source.config.clone(),
416            None,
417        )
418    }
419
420    #[try_stream(ok = Message, error = StreamExecutorError)]
421    async fn into_stream(mut self) {
422        let mut upstream = self.upstream.take().unwrap().execute();
423        let barrier = expect_first_barrier(&mut upstream).await?;
424        let first_epoch = barrier.epoch;
425        let is_pause_on_startup = barrier.is_pause_on_startup();
426        yield Message::Barrier(barrier);
427
428        let mut core = self.stream_source_core.take().unwrap();
429        let mut state_store_handler = core.split_state_store;
430
431        // Build source description from the builder.
432        let source_desc_builder = core.source_desc_builder.take().unwrap();
433
434        let source_desc = source_desc_builder
435            .build()
436            .map_err(StreamExecutorError::connector_error)?;
437
438        let file_path_idx = source_desc
439            .columns
440            .iter()
441            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
442            .unwrap();
443        let file_pos_idx = source_desc
444            .columns
445            .iter()
446            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
447            .unwrap();
448        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
449        let row_id_idx = source_desc
450            .columns
451            .iter()
452            .position(|c| c.name == ROW_ID_COLUMN_NAME)
453            .unwrap();
454        tracing::trace!(
455            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
456            source_desc.columns,
457            file_path_idx,
458            file_pos_idx,
459            row_id_idx
460        );
461        // Initialize state table.
462        state_store_handler.init_epoch(first_epoch).await?;
463
464        let mut splits_on_fetch: usize = 0;
465        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
466            upstream,
467            stream::pending().boxed(),
468        );
469
470        if is_pause_on_startup {
471            stream.pause_stream();
472        }
473
474        // If it is a recovery startup,
475        // there can be file assignments in the state table.
476        // Hence we try building a reader first.
477        Self::replace_with_new_batch_reader(
478            &mut splits_on_fetch,
479            &state_store_handler, // move into the function
480            core.column_ids.clone(),
481            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
482            source_desc.clone(),
483            &mut stream,
484            self.rate_limit_rps,
485            self.streaming_config.clone(),
486        )
487        .await?;
488
489        while let Some(msg) = stream.next().await {
490            match msg {
491                Err(e) => {
492                    tracing::error!(error = %e.as_report(), "Fetch Error");
493                    splits_on_fetch = 0;
494                }
495                Ok(msg) => {
496                    match msg {
497                        // This branch will be preferred.
498                        Either::Left(msg) => {
499                            match msg {
500                                Message::Barrier(barrier) => {
501                                    let mut need_rebuild_reader = false;
502
503                                    if let Some(mutation) = barrier.mutation.as_deref() {
504                                        match mutation {
505                                            Mutation::Pause => stream.pause_stream(),
506                                            Mutation::Resume => stream.resume_stream(),
507                                            Mutation::Throttle(fragment_to_apply) => {
508                                                if let Some(entry) = fragment_to_apply
509                                                    .get(&self.actor_ctx.fragment_id)
510                                                    && entry.throttle_type() == ThrottleType::Source
511                                                    && entry.rate_limit != self.rate_limit_rps
512                                                {
513                                                    tracing::debug!(
514                                                        "updating rate limit from {:?} to {:?}",
515                                                        self.rate_limit_rps,
516                                                        entry.rate_limit
517                                                    );
518                                                    self.rate_limit_rps = entry.rate_limit;
519                                                    need_rebuild_reader = true;
520                                                }
521                                            }
522                                            _ => (),
523                                        }
524                                    }
525
526                                    let post_commit = state_store_handler
527                                        .commit_may_update_vnode_bitmap(barrier.epoch)
528                                        .await?;
529
530                                    let update_vnode_bitmap =
531                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
532                                    // Propagate the barrier.
533                                    yield Message::Barrier(barrier);
534
535                                    if post_commit
536                                        .post_yield_barrier(update_vnode_bitmap)
537                                        .await?
538                                        .is_some()
539                                    {
540                                        // Vnode bitmap update changes which file assignments this executor
541                                        // should read. Rebuild the reader to avoid reading splits that no
542                                        // longer belong to this actor (e.g., during scale-out).
543                                        splits_on_fetch = 0;
544                                    }
545
546                                    if splits_on_fetch == 0 || need_rebuild_reader {
547                                        Self::replace_with_new_batch_reader(
548                                            &mut splits_on_fetch,
549                                            &state_store_handler,
550                                            core.column_ids.clone(),
551                                            self.build_source_ctx(
552                                                &source_desc,
553                                                core.source_id,
554                                                &core.source_name,
555                                            ),
556                                            source_desc.clone(),
557                                            &mut stream,
558                                            self.rate_limit_rps,
559                                            self.streaming_config.clone(),
560                                        )
561                                        .await?;
562                                    }
563                                }
564                                // Receiving file assignments from upstream list executor,
565                                // store into state table.
566                                Message::Chunk(chunk) => {
567                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
568                                        .data_chunk()
569                                        .rows()
570                                        .map(|row| {
571                                            let file_name = row.datum_at(0).unwrap().into_utf8();
572                                            let split = row.datum_at(1).unwrap().into_jsonb();
573                                            (file_name.to_owned(), split.to_owned_scalar())
574                                        })
575                                        .collect();
576                                    state_store_handler.set_states_json(jsonb_values).await?;
577                                    state_store_handler.try_flush().await?;
578                                }
579                                Message::Watermark(_) => unreachable!(),
580                            }
581                        }
582                        // StreamChunk from FsSourceReader, and the reader reads only one file.
583                        Either::Right(ChunksWithState {
584                            chunks,
585                            data_file_path,
586                            last_read_pos: _,
587                        }) => {
588                            // TODO: support persist progress after supporting reading part of a file.
589                            if true {
590                                splits_on_fetch -= 1;
591                                state_store_handler.delete(&data_file_path).await?;
592                            }
593
594                            for chunk in &chunks {
595                                let chunk = prune_additional_cols(
596                                    chunk,
597                                    &[file_path_idx, file_pos_idx],
598                                    &source_desc.columns,
599                                );
600                                // pad row_id
601                                let (chunk, op) = chunk.into_parts();
602                                let (mut columns, visibility) = chunk.into_parts();
603                                columns.insert(
604                                    row_id_idx,
605                                    Arc::new(
606                                        SerialArray::from_iter_bitmap(
607                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
608                                            Bitmap::zeros(columns[0].len()),
609                                        )
610                                        .into(),
611                                    ),
612                                );
613                                let chunk = StreamChunk::from_parts(
614                                    op,
615                                    DataChunk::from_parts(columns.into(), visibility),
616                                );
617
618                                yield Message::Chunk(chunk);
619                            }
620                        }
621                    }
622                }
623            }
624        }
625    }
626}
627
628impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
629    fn execute(self: Box<Self>) -> BoxedMessageStream {
630        self.into_stream().boxed()
631    }
632}
633
634impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
635    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
636        if let Some(core) = &self.stream_source_core {
637            f.debug_struct("IcebergFetchExecutor")
638                .field("source_id", &core.source_id)
639                .field("column_ids", &core.column_ids)
640                .finish()
641        } else {
642            f.debug_struct("IcebergFetchExecutor").finish()
643        }
644    }
645}