risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_pb::common::ThrottleType;
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
39use crate::common::rate_limit::limited_chunk_size;
40use crate::executor::prelude::*;
41use crate::executor::stream_reader::StreamReaderWithPause;
42
43/// An executor that fetches data from Iceberg tables.
44///
45/// This executor works with an upstream list executor that provides the list of files to read.
46/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
47/// downstream.
48pub struct IcebergFetchExecutor<S: StateStore> {
49    actor_ctx: ActorContextRef,
50
51    /// Core component for managing external streaming source state
52    stream_source_core: Option<StreamSourceCore<S>>,
53
54    /// Upstream list executor that provides the list of files to read.
55    /// This executor is responsible for discovering new files and changes in the Iceberg table.
56    upstream: Option<Executor>,
57
58    /// Optional rate limit in rows/s to control data ingestion speed
59    rate_limit_rps: Option<u32>,
60
61    /// Configuration for streaming operations, including Iceberg-specific settings
62    streaming_config: Arc<StreamingConfig>,
63}
64
65/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
66///
67/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
68/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
69pub(crate) struct ChunksWithState {
70    /// The actual data chunks read from the file
71    pub chunks: Vec<StreamChunk>,
72
73    /// Path to the data file, used for checkpointing and error reporting.
74    pub data_file_path: String,
75
76    /// The last read position in the file, used for checkpointing.
77    #[expect(dead_code)]
78    pub last_read_pos: Datum,
79}
80
81pub(super) use state::PersistedFileScanTask;
82mod state {
83    use std::sync::Arc;
84
85    use anyhow::Context;
86    use iceberg::expr::BoundPredicate;
87    use iceberg::scan::FileScanTask;
88    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
89    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
90    use serde::{Deserialize, Serialize};
91
92    use crate::executor::StreamExecutorResult;
93
94    fn default_case_sensitive() -> bool {
95        true
96    }
97    /// This corresponds to the actually persisted `FileScanTask` in the state table.
98    ///
99    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
100    /// Currently, they have the same definition.
101    ///
102    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
103    /// A version id needs to be introduced then.
104    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
105    pub struct PersistedFileScanTask {
106        /// The start offset of the file to scan.
107        pub start: u64,
108        /// The length of the file to scan.
109        pub length: u64,
110        /// The number of records in the file to scan.
111        ///
112        /// This is an optional field, and only available if we are
113        /// reading the entire data file.
114        pub record_count: Option<u64>,
115
116        /// The data file path corresponding to the task.
117        pub data_file_path: String,
118
119        /// The content type of the file to scan.
120        pub data_file_content: DataContentType,
121
122        /// The format of the file to scan.
123        pub data_file_format: DataFileFormat,
124
125        /// The schema of the file to scan.
126        pub schema: SchemaRef,
127        /// The field ids to project.
128        pub project_field_ids: Vec<i32>,
129        /// The predicate to filter.
130        #[serde(skip_serializing_if = "Option::is_none")]
131        pub predicate: Option<BoundPredicate>,
132
133        /// The list of delete files that may need to be applied to this data file
134        pub deletes: Vec<PersistedFileScanTask>,
135        /// sequence number
136        pub sequence_number: i64,
137        /// equality ids
138        pub equality_ids: Option<Vec<i32>>,
139
140        pub file_size_in_bytes: u64,
141
142        #[serde(default = "default_case_sensitive")]
143        pub case_sensitive: bool,
144    }
145
146    impl PersistedFileScanTask {
147        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
148        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
149            let persisted_task: Self =
150                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
151                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
152            Ok(Self::to_task(persisted_task))
153        }
154
155        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
156        pub fn encode(task: FileScanTask) -> JsonbVal {
157            let persisted_task = Self::from_task(task);
158            serde_json::to_value(persisted_task).unwrap().into()
159        }
160
161        /// Converts a persisted task to a [`FileScanTask`].
162        fn to_task(
163            Self {
164                start,
165                length,
166                record_count,
167                data_file_path,
168                data_file_content,
169                data_file_format,
170                schema,
171                project_field_ids,
172                predicate,
173                deletes,
174                sequence_number,
175                equality_ids,
176                file_size_in_bytes,
177                case_sensitive,
178            }: Self,
179        ) -> FileScanTask {
180            FileScanTask {
181                start,
182                length,
183                record_count,
184                data_file_path,
185                data_file_content,
186                data_file_format,
187                schema,
188                project_field_ids,
189                predicate,
190                deletes: deletes
191                    .into_iter()
192                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
193                    .collect(),
194                sequence_number,
195                equality_ids,
196                file_size_in_bytes,
197                partition: None,
198                partition_spec: None,
199                name_mapping: None,
200                case_sensitive,
201            }
202        }
203
204        /// Changes a [`FileScanTask`] to a persisted one.
205        fn from_task(
206            FileScanTask {
207                start,
208                length,
209                record_count,
210                data_file_path,
211                data_file_content,
212                data_file_format,
213                schema,
214                project_field_ids,
215                predicate,
216                deletes,
217                sequence_number,
218                equality_ids,
219                file_size_in_bytes,
220                case_sensitive,
221                ..
222            }: FileScanTask,
223        ) -> Self {
224            Self {
225                start,
226                length,
227                record_count,
228                data_file_path,
229                data_file_content,
230                data_file_format,
231                schema,
232                project_field_ids,
233                predicate,
234                deletes: deletes
235                    .into_iter()
236                    .map(PersistedFileScanTask::from_task_ref)
237                    .collect(),
238                sequence_number,
239                equality_ids,
240                file_size_in_bytes,
241                case_sensitive,
242            }
243        }
244
245        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
246            Self {
247                start: task.start,
248                length: task.length,
249                record_count: task.record_count,
250                data_file_path: task.data_file_path.clone(),
251                data_file_content: task.data_file_content,
252                data_file_format: task.data_file_format,
253                schema: task.schema.clone(),
254                project_field_ids: task.project_field_ids.clone(),
255                predicate: task.predicate.clone(),
256                deletes: task
257                    .deletes
258                    .iter()
259                    .cloned()
260                    .map(PersistedFileScanTask::from_task_ref)
261                    .collect(),
262                sequence_number: task.sequence_number,
263                equality_ids: task.equality_ids.clone(),
264                file_size_in_bytes: task.file_size_in_bytes,
265                case_sensitive: task.case_sensitive,
266            }
267        }
268    }
269}
270
271impl<S: StateStore> IcebergFetchExecutor<S> {
272    pub fn new(
273        actor_ctx: ActorContextRef,
274        stream_source_core: StreamSourceCore<S>,
275        upstream: Executor,
276        rate_limit_rps: Option<u32>,
277        streaming_config: Arc<StreamingConfig>,
278    ) -> Self {
279        Self {
280            actor_ctx,
281            stream_source_core: Some(stream_source_core),
282            upstream: Some(upstream),
283            rate_limit_rps,
284            streaming_config,
285        }
286    }
287
288    #[expect(clippy::too_many_arguments)]
289    async fn replace_with_new_batch_reader<const BIASED: bool>(
290        splits_on_fetch: &mut usize,
291        state_store_handler: &SourceStateTableHandler<S>,
292        column_ids: Vec<ColumnId>,
293        source_ctx: SourceContext,
294        source_desc: SourceDesc,
295        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
296        rate_limit_rps: Option<u32>,
297        streaming_config: Arc<StreamingConfig>,
298    ) -> StreamExecutorResult<()> {
299        let mut batch =
300            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
301        let state_table = state_store_handler.state_table();
302        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
303            let table_iter = state_table
304                .iter_with_vnode(
305                    vnode,
306                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
307                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
308                    PrefetchOptions::prefetch_for_small_range_scan(),
309                )
310                .await?;
311            pin_mut!(table_iter);
312            while let Some(item) = table_iter.next().await {
313                let row = item?;
314                let task = match row.datum_at(1) {
315                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
316                        PersistedFileScanTask::decode(jsonb_ref)?
317                    }
318                    _ => unreachable!(),
319                };
320                batch.push(task);
321
322                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
323                    break 'vnodes;
324                }
325            }
326        }
327        if batch.is_empty() {
328            stream.replace_data_stream(stream::pending().boxed());
329        } else {
330            *splits_on_fetch += batch.len();
331            let batch_reader = Self::build_batched_stream_reader(
332                column_ids,
333                source_ctx,
334                source_desc,
335                batch,
336                rate_limit_rps,
337                streaming_config,
338            )
339            .map_err(StreamExecutorError::connector_error);
340            stream.replace_data_stream(batch_reader);
341        }
342
343        Ok(())
344    }
345
346    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
347    async fn build_batched_stream_reader(
348        _column_ids: Vec<ColumnId>,
349        _source_ctx: SourceContext,
350        source_desc: SourceDesc,
351        batch: Vec<FileScanTask>,
352        _rate_limit_rps: Option<u32>,
353        streaming_config: Arc<StreamingConfig>,
354    ) {
355        let file_path_idx = source_desc
356            .columns
357            .iter()
358            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
359            .unwrap();
360        let file_pos_idx = source_desc
361            .columns
362            .iter()
363            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
364            .unwrap();
365        let properties = source_desc.source.config.clone();
366        let properties = match properties {
367            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
368                iceberg_properties
369            }
370            _ => unreachable!(),
371        };
372        let table = properties.load_table().await?;
373
374        for task in batch {
375            let mut chunks = vec![];
376            #[for_await]
377            for chunk in scan_task_to_chunk_with_deletes(
378                table.clone(),
379                task,
380                IcebergScanOpts {
381                    chunk_size: streaming_config.developer.chunk_size,
382                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
383                    need_file_path_and_pos: true,
384                    handle_delete_files: table.metadata().format_version()
385                        >= iceberg::spec::FormatVersion::V3,
386                },
387                None,
388            ) {
389                let chunk = chunk?;
390                chunks.push(StreamChunk::from_parts(
391                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
392                    chunk,
393                ));
394            }
395            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
396            let last_chunk = chunks.last().unwrap();
397            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
398            let data_file_path = last_row
399                .datum_at(file_path_idx)
400                .unwrap()
401                .into_utf8()
402                .to_owned();
403            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
404            yield ChunksWithState {
405                chunks,
406                data_file_path,
407                last_read_pos,
408            };
409        }
410    }
411
412    fn build_source_ctx(
413        &self,
414        source_desc: &SourceDesc,
415        source_id: SourceId,
416        source_name: &str,
417    ) -> SourceContext {
418        SourceContext::new(
419            self.actor_ctx.id,
420            source_id,
421            self.actor_ctx.fragment_id,
422            source_name.to_owned(),
423            source_desc.metrics.clone(),
424            SourceCtrlOpts {
425                chunk_size: limited_chunk_size(self.rate_limit_rps),
426                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
427            },
428            source_desc.source.config.clone(),
429            None,
430        )
431    }
432
433    #[try_stream(ok = Message, error = StreamExecutorError)]
434    async fn into_stream(mut self) {
435        let mut upstream = self.upstream.take().unwrap().execute();
436        let barrier = expect_first_barrier(&mut upstream).await?;
437        let first_epoch = barrier.epoch;
438        let is_pause_on_startup = barrier.is_pause_on_startup();
439        yield Message::Barrier(barrier);
440
441        let mut core = self.stream_source_core.take().unwrap();
442        let mut state_store_handler = core.split_state_store;
443
444        // Build source description from the builder.
445        let source_desc_builder = core.source_desc_builder.take().unwrap();
446
447        let source_desc = source_desc_builder
448            .build()
449            .map_err(StreamExecutorError::connector_error)?;
450
451        let file_path_idx = source_desc
452            .columns
453            .iter()
454            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
455            .unwrap();
456        let file_pos_idx = source_desc
457            .columns
458            .iter()
459            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
460            .unwrap();
461        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
462        let row_id_idx = source_desc
463            .columns
464            .iter()
465            .position(|c| c.name == ROW_ID_COLUMN_NAME)
466            .unwrap();
467        tracing::trace!(
468            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
469            source_desc.columns,
470            file_path_idx,
471            file_pos_idx,
472            row_id_idx
473        );
474        // Initialize state table.
475        state_store_handler.init_epoch(first_epoch).await?;
476
477        let mut splits_on_fetch: usize = 0;
478        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
479            upstream,
480            stream::pending().boxed(),
481        );
482
483        if is_pause_on_startup {
484            stream.pause_stream();
485        }
486
487        // If it is a recovery startup,
488        // there can be file assignments in the state table.
489        // Hence we try building a reader first.
490        Self::replace_with_new_batch_reader(
491            &mut splits_on_fetch,
492            &state_store_handler, // move into the function
493            core.column_ids.clone(),
494            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
495            source_desc.clone(),
496            &mut stream,
497            self.rate_limit_rps,
498            self.streaming_config.clone(),
499        )
500        .await?;
501
502        while let Some(msg) = stream.next().await {
503            match msg {
504                Err(e) => {
505                    tracing::error!(error = %e.as_report(), "Fetch Error");
506                    splits_on_fetch = 0;
507                }
508                Ok(msg) => {
509                    match msg {
510                        // This branch will be preferred.
511                        Either::Left(msg) => {
512                            match msg {
513                                Message::Barrier(barrier) => {
514                                    let mut need_rebuild_reader = false;
515
516                                    if let Some(mutation) = barrier.mutation.as_deref() {
517                                        match mutation {
518                                            Mutation::Pause => stream.pause_stream(),
519                                            Mutation::Resume => stream.resume_stream(),
520                                            Mutation::Throttle(fragment_to_apply) => {
521                                                if let Some(entry) = fragment_to_apply
522                                                    .get(&self.actor_ctx.fragment_id)
523                                                    && entry.throttle_type() == ThrottleType::Source
524                                                    && entry.rate_limit != self.rate_limit_rps
525                                                {
526                                                    tracing::debug!(
527                                                        "updating rate limit from {:?} to {:?}",
528                                                        self.rate_limit_rps,
529                                                        entry.rate_limit
530                                                    );
531                                                    self.rate_limit_rps = entry.rate_limit;
532                                                    need_rebuild_reader = true;
533                                                }
534                                            }
535                                            _ => (),
536                                        }
537                                    }
538
539                                    let post_commit = state_store_handler
540                                        .commit_may_update_vnode_bitmap(barrier.epoch)
541                                        .await?;
542
543                                    let update_vnode_bitmap =
544                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
545                                    // Propagate the barrier.
546                                    yield Message::Barrier(barrier);
547
548                                    if post_commit
549                                        .post_yield_barrier(update_vnode_bitmap)
550                                        .await?
551                                        .is_some()
552                                    {
553                                        // Vnode bitmap update changes which file assignments this executor
554                                        // should read. Rebuild the reader to avoid reading splits that no
555                                        // longer belong to this actor (e.g., during scale-out).
556                                        splits_on_fetch = 0;
557                                    }
558
559                                    if splits_on_fetch == 0 || need_rebuild_reader {
560                                        Self::replace_with_new_batch_reader(
561                                            &mut splits_on_fetch,
562                                            &state_store_handler,
563                                            core.column_ids.clone(),
564                                            self.build_source_ctx(
565                                                &source_desc,
566                                                core.source_id,
567                                                &core.source_name,
568                                            ),
569                                            source_desc.clone(),
570                                            &mut stream,
571                                            self.rate_limit_rps,
572                                            self.streaming_config.clone(),
573                                        )
574                                        .await?;
575                                    }
576                                }
577                                // Receiving file assignments from upstream list executor,
578                                // store into state table.
579                                Message::Chunk(chunk) => {
580                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
581                                        .data_chunk()
582                                        .rows()
583                                        .map(|row| {
584                                            let file_name = row.datum_at(0).unwrap().into_utf8();
585                                            let split = row.datum_at(1).unwrap().into_jsonb();
586                                            (file_name.to_owned(), split.to_owned_scalar())
587                                        })
588                                        .collect();
589                                    state_store_handler.set_states_json(jsonb_values).await?;
590                                    state_store_handler.try_flush().await?;
591                                }
592                                Message::Watermark(_) => unreachable!(),
593                            }
594                        }
595                        // StreamChunk from FsSourceReader, and the reader reads only one file.
596                        Either::Right(ChunksWithState {
597                            chunks,
598                            data_file_path,
599                            last_read_pos: _,
600                        }) => {
601                            // TODO: support persist progress after supporting reading part of a file.
602                            if true {
603                                splits_on_fetch -= 1;
604                                state_store_handler.delete(&data_file_path).await?;
605                            }
606
607                            for chunk in &chunks {
608                                let chunk = prune_additional_cols(
609                                    chunk,
610                                    &[file_path_idx, file_pos_idx],
611                                    &source_desc.columns,
612                                );
613                                // pad row_id
614                                let (chunk, op) = chunk.into_parts();
615                                let (mut columns, visibility) = chunk.into_parts();
616                                columns.insert(
617                                    row_id_idx,
618                                    Arc::new(
619                                        SerialArray::from_iter_bitmap(
620                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
621                                            Bitmap::zeros(columns[0].len()),
622                                        )
623                                        .into(),
624                                    ),
625                                );
626                                let chunk = StreamChunk::from_parts(
627                                    op,
628                                    DataChunk::from_parts(columns.into(), visibility),
629                                );
630
631                                yield Message::Chunk(chunk);
632                            }
633                        }
634                    }
635                }
636            }
637        }
638    }
639}
640
641impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
642    fn execute(self: Box<Self>) -> BoxedMessageStream {
643        self.into_stream().boxed()
644    }
645}
646
647impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
648    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
649        if let Some(core) = &self.stream_source_core {
650            f.debug_struct("IcebergFetchExecutor")
651                .field("source_id", &core.source_id)
652                .field("column_ids", &core.column_ids)
653                .finish()
654        } else {
655            f.debug_struct("IcebergFetchExecutor").finish()
656        }
657    }
658}