risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_pb::common::ThrottleType;
35use risingwave_storage::store::PrefetchOptions;
36use thiserror_ext::AsReport;
37
38use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
39use crate::common::rate_limit::limited_chunk_size;
40use crate::executor::prelude::*;
41use crate::executor::stream_reader::StreamReaderWithPause;
42
43/// An executor that fetches data from Iceberg tables.
44///
45/// This executor works with an upstream list executor that provides the list of files to read.
46/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
47/// downstream.
48pub struct IcebergFetchExecutor<S: StateStore> {
49    actor_ctx: ActorContextRef,
50
51    /// Core component for managing external streaming source state
52    stream_source_core: Option<StreamSourceCore<S>>,
53
54    /// Upstream list executor that provides the list of files to read.
55    /// This executor is responsible for discovering new files and changes in the Iceberg table.
56    upstream: Option<Executor>,
57
58    /// Optional rate limit in rows/s to control data ingestion speed
59    rate_limit_rps: Option<u32>,
60
61    /// Configuration for streaming operations, including Iceberg-specific settings
62    streaming_config: Arc<StreamingConfig>,
63}
64
65/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
66///
67/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
68/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
69pub(crate) struct ChunksWithState {
70    /// The actual data chunks read from the file
71    pub chunks: Vec<StreamChunk>,
72
73    /// Path to the data file, used for checkpointing and error reporting.
74    pub data_file_path: String,
75
76    /// The last read position in the file, used for checkpointing.
77    #[expect(dead_code)]
78    pub last_read_pos: Datum,
79}
80
81pub(super) use state::PersistedFileScanTask;
82mod state {
83    use std::sync::Arc;
84
85    use anyhow::Context;
86    use iceberg::expr::BoundPredicate;
87    use iceberg::scan::FileScanTask;
88    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
89    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
90    use serde::{Deserialize, Serialize};
91
92    use crate::executor::StreamExecutorResult;
93    /// This corresponds to the actually persisted `FileScanTask` in the state table.
94    ///
95    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
96    /// Currently, they have the same definition.
97    ///
98    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
99    /// A version id needs to be introduced then.
100    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101    pub struct PersistedFileScanTask {
102        /// The start offset of the file to scan.
103        pub start: u64,
104        /// The length of the file to scan.
105        pub length: u64,
106        /// The number of records in the file to scan.
107        ///
108        /// This is an optional field, and only available if we are
109        /// reading the entire data file.
110        pub record_count: Option<u64>,
111
112        /// The data file path corresponding to the task.
113        pub data_file_path: String,
114
115        /// The content type of the file to scan.
116        pub data_file_content: DataContentType,
117
118        /// The format of the file to scan.
119        pub data_file_format: DataFileFormat,
120
121        /// The schema of the file to scan.
122        pub schema: SchemaRef,
123        /// The field ids to project.
124        pub project_field_ids: Vec<i32>,
125        /// The predicate to filter.
126        #[serde(skip_serializing_if = "Option::is_none")]
127        pub predicate: Option<BoundPredicate>,
128
129        /// The list of delete files that may need to be applied to this data file
130        pub deletes: Vec<PersistedFileScanTask>,
131        /// sequence number
132        pub sequence_number: i64,
133        /// equality ids
134        pub equality_ids: Option<Vec<i32>>,
135
136        pub file_size_in_bytes: u64,
137    }
138
139    impl PersistedFileScanTask {
140        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
141        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
142            let persisted_task: Self =
143                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
144                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
145            Ok(Self::to_task(persisted_task))
146        }
147
148        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
149        pub fn encode(task: FileScanTask) -> JsonbVal {
150            let persisted_task = Self::from_task(task);
151            serde_json::to_value(persisted_task).unwrap().into()
152        }
153
154        /// Converts a persisted task to a [`FileScanTask`].
155        fn to_task(
156            Self {
157                start,
158                length,
159                record_count,
160                data_file_path,
161                data_file_content,
162                data_file_format,
163                schema,
164                project_field_ids,
165                predicate,
166                deletes,
167                sequence_number,
168                equality_ids,
169                file_size_in_bytes,
170            }: Self,
171        ) -> FileScanTask {
172            FileScanTask {
173                start,
174                length,
175                record_count,
176                data_file_path,
177                data_file_content,
178                data_file_format,
179                schema,
180                project_field_ids,
181                predicate,
182                deletes: deletes
183                    .into_iter()
184                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
185                    .collect(),
186                sequence_number,
187                equality_ids,
188                file_size_in_bytes,
189                partition: None,
190                partition_spec: None,
191                name_mapping: None,
192            }
193        }
194
195        /// Changes a [`FileScanTask`] to a persisted one.
196        fn from_task(
197            FileScanTask {
198                start,
199                length,
200                record_count,
201                data_file_path,
202                data_file_content,
203                data_file_format,
204                schema,
205                project_field_ids,
206                predicate,
207                deletes,
208                sequence_number,
209                equality_ids,
210                file_size_in_bytes,
211                ..
212            }: FileScanTask,
213        ) -> Self {
214            Self {
215                start,
216                length,
217                record_count,
218                data_file_path,
219                data_file_content,
220                data_file_format,
221                schema,
222                project_field_ids,
223                predicate,
224                deletes: deletes
225                    .into_iter()
226                    .map(PersistedFileScanTask::from_task_ref)
227                    .collect(),
228                sequence_number,
229                equality_ids,
230                file_size_in_bytes,
231            }
232        }
233
234        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
235            Self {
236                start: task.start,
237                length: task.length,
238                record_count: task.record_count,
239                data_file_path: task.data_file_path.clone(),
240                data_file_content: task.data_file_content,
241                data_file_format: task.data_file_format,
242                schema: task.schema.clone(),
243                project_field_ids: task.project_field_ids.clone(),
244                predicate: task.predicate.clone(),
245                deletes: task
246                    .deletes
247                    .iter()
248                    .cloned()
249                    .map(PersistedFileScanTask::from_task_ref)
250                    .collect(),
251                sequence_number: task.sequence_number,
252                equality_ids: task.equality_ids.clone(),
253                file_size_in_bytes: task.file_size_in_bytes,
254            }
255        }
256    }
257}
258
259impl<S: StateStore> IcebergFetchExecutor<S> {
260    pub fn new(
261        actor_ctx: ActorContextRef,
262        stream_source_core: StreamSourceCore<S>,
263        upstream: Executor,
264        rate_limit_rps: Option<u32>,
265        streaming_config: Arc<StreamingConfig>,
266    ) -> Self {
267        Self {
268            actor_ctx,
269            stream_source_core: Some(stream_source_core),
270            upstream: Some(upstream),
271            rate_limit_rps,
272            streaming_config,
273        }
274    }
275
276    #[expect(clippy::too_many_arguments)]
277    async fn replace_with_new_batch_reader<const BIASED: bool>(
278        splits_on_fetch: &mut usize,
279        state_store_handler: &SourceStateTableHandler<S>,
280        column_ids: Vec<ColumnId>,
281        source_ctx: SourceContext,
282        source_desc: SourceDesc,
283        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
284        rate_limit_rps: Option<u32>,
285        streaming_config: Arc<StreamingConfig>,
286    ) -> StreamExecutorResult<()> {
287        let mut batch =
288            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
289        let state_table = state_store_handler.state_table();
290        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
291            let table_iter = state_table
292                .iter_with_vnode(
293                    vnode,
294                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
295                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
296                    PrefetchOptions::prefetch_for_small_range_scan(),
297                )
298                .await?;
299            pin_mut!(table_iter);
300            while let Some(item) = table_iter.next().await {
301                let row = item?;
302                let task = match row.datum_at(1) {
303                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
304                        PersistedFileScanTask::decode(jsonb_ref)?
305                    }
306                    _ => unreachable!(),
307                };
308                batch.push(task);
309
310                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
311                    break 'vnodes;
312                }
313            }
314        }
315        if batch.is_empty() {
316            stream.replace_data_stream(stream::pending().boxed());
317        } else {
318            *splits_on_fetch += batch.len();
319            let batch_reader = Self::build_batched_stream_reader(
320                column_ids,
321                source_ctx,
322                source_desc,
323                batch,
324                rate_limit_rps,
325                streaming_config,
326            )
327            .map_err(StreamExecutorError::connector_error);
328            stream.replace_data_stream(batch_reader);
329        }
330
331        Ok(())
332    }
333
334    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
335    async fn build_batched_stream_reader(
336        _column_ids: Vec<ColumnId>,
337        _source_ctx: SourceContext,
338        source_desc: SourceDesc,
339        batch: Vec<FileScanTask>,
340        _rate_limit_rps: Option<u32>,
341        streaming_config: Arc<StreamingConfig>,
342    ) {
343        let file_path_idx = source_desc
344            .columns
345            .iter()
346            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
347            .unwrap();
348        let file_pos_idx = source_desc
349            .columns
350            .iter()
351            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
352            .unwrap();
353        let properties = source_desc.source.config.clone();
354        let properties = match properties {
355            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
356                iceberg_properties
357            }
358            _ => unreachable!(),
359        };
360        let table = properties.load_table().await?;
361
362        for task in batch {
363            let mut chunks = vec![];
364            #[for_await]
365            for chunk in scan_task_to_chunk_with_deletes(
366                table.clone(),
367                task,
368                IcebergScanOpts {
369                    chunk_size: streaming_config.developer.chunk_size,
370                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
371                    need_file_path_and_pos: true,
372                    handle_delete_files: table.metadata().format_version()
373                        >= iceberg::spec::FormatVersion::V3,
374                },
375                None,
376            ) {
377                let chunk = chunk?;
378                chunks.push(StreamChunk::from_parts(
379                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
380                    chunk,
381                ));
382            }
383            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
384            let last_chunk = chunks.last().unwrap();
385            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
386            let data_file_path = last_row
387                .datum_at(file_path_idx)
388                .unwrap()
389                .into_utf8()
390                .to_owned();
391            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
392            yield ChunksWithState {
393                chunks,
394                data_file_path,
395                last_read_pos,
396            };
397        }
398    }
399
400    fn build_source_ctx(
401        &self,
402        source_desc: &SourceDesc,
403        source_id: SourceId,
404        source_name: &str,
405    ) -> SourceContext {
406        SourceContext::new(
407            self.actor_ctx.id,
408            source_id,
409            self.actor_ctx.fragment_id,
410            source_name.to_owned(),
411            source_desc.metrics.clone(),
412            SourceCtrlOpts {
413                chunk_size: limited_chunk_size(self.rate_limit_rps),
414                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
415            },
416            source_desc.source.config.clone(),
417            None,
418        )
419    }
420
421    #[try_stream(ok = Message, error = StreamExecutorError)]
422    async fn into_stream(mut self) {
423        let mut upstream = self.upstream.take().unwrap().execute();
424        let barrier = expect_first_barrier(&mut upstream).await?;
425        let first_epoch = barrier.epoch;
426        let is_pause_on_startup = barrier.is_pause_on_startup();
427        yield Message::Barrier(barrier);
428
429        let mut core = self.stream_source_core.take().unwrap();
430        let mut state_store_handler = core.split_state_store;
431
432        // Build source description from the builder.
433        let source_desc_builder = core.source_desc_builder.take().unwrap();
434
435        let source_desc = source_desc_builder
436            .build()
437            .map_err(StreamExecutorError::connector_error)?;
438
439        let file_path_idx = source_desc
440            .columns
441            .iter()
442            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
443            .unwrap();
444        let file_pos_idx = source_desc
445            .columns
446            .iter()
447            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
448            .unwrap();
449        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
450        let row_id_idx = source_desc
451            .columns
452            .iter()
453            .position(|c| c.name == ROW_ID_COLUMN_NAME)
454            .unwrap();
455        tracing::trace!(
456            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
457            source_desc.columns,
458            file_path_idx,
459            file_pos_idx,
460            row_id_idx
461        );
462        // Initialize state table.
463        state_store_handler.init_epoch(first_epoch).await?;
464
465        let mut splits_on_fetch: usize = 0;
466        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
467            upstream,
468            stream::pending().boxed(),
469        );
470
471        if is_pause_on_startup {
472            stream.pause_stream();
473        }
474
475        // If it is a recovery startup,
476        // there can be file assignments in the state table.
477        // Hence we try building a reader first.
478        Self::replace_with_new_batch_reader(
479            &mut splits_on_fetch,
480            &state_store_handler, // move into the function
481            core.column_ids.clone(),
482            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
483            source_desc.clone(),
484            &mut stream,
485            self.rate_limit_rps,
486            self.streaming_config.clone(),
487        )
488        .await?;
489
490        while let Some(msg) = stream.next().await {
491            match msg {
492                Err(e) => {
493                    tracing::error!(error = %e.as_report(), "Fetch Error");
494                    splits_on_fetch = 0;
495                }
496                Ok(msg) => {
497                    match msg {
498                        // This branch will be preferred.
499                        Either::Left(msg) => {
500                            match msg {
501                                Message::Barrier(barrier) => {
502                                    let mut need_rebuild_reader = false;
503
504                                    if let Some(mutation) = barrier.mutation.as_deref() {
505                                        match mutation {
506                                            Mutation::Pause => stream.pause_stream(),
507                                            Mutation::Resume => stream.resume_stream(),
508                                            Mutation::Throttle(fragment_to_apply) => {
509                                                if let Some(entry) = fragment_to_apply
510                                                    .get(&self.actor_ctx.fragment_id)
511                                                    && entry.throttle_type() == ThrottleType::Source
512                                                    && entry.rate_limit != self.rate_limit_rps
513                                                {
514                                                    tracing::debug!(
515                                                        "updating rate limit from {:?} to {:?}",
516                                                        self.rate_limit_rps,
517                                                        entry.rate_limit
518                                                    );
519                                                    self.rate_limit_rps = entry.rate_limit;
520                                                    need_rebuild_reader = true;
521                                                }
522                                            }
523                                            _ => (),
524                                        }
525                                    }
526
527                                    let post_commit = state_store_handler
528                                        .commit_may_update_vnode_bitmap(barrier.epoch)
529                                        .await?;
530
531                                    let update_vnode_bitmap =
532                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
533                                    // Propagate the barrier.
534                                    yield Message::Barrier(barrier);
535
536                                    if post_commit
537                                        .post_yield_barrier(update_vnode_bitmap)
538                                        .await?
539                                        .is_some()
540                                    {
541                                        // Vnode bitmap update changes which file assignments this executor
542                                        // should read. Rebuild the reader to avoid reading splits that no
543                                        // longer belong to this actor (e.g., during scale-out).
544                                        splits_on_fetch = 0;
545                                    }
546
547                                    if splits_on_fetch == 0 || need_rebuild_reader {
548                                        Self::replace_with_new_batch_reader(
549                                            &mut splits_on_fetch,
550                                            &state_store_handler,
551                                            core.column_ids.clone(),
552                                            self.build_source_ctx(
553                                                &source_desc,
554                                                core.source_id,
555                                                &core.source_name,
556                                            ),
557                                            source_desc.clone(),
558                                            &mut stream,
559                                            self.rate_limit_rps,
560                                            self.streaming_config.clone(),
561                                        )
562                                        .await?;
563                                    }
564                                }
565                                // Receiving file assignments from upstream list executor,
566                                // store into state table.
567                                Message::Chunk(chunk) => {
568                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
569                                        .data_chunk()
570                                        .rows()
571                                        .map(|row| {
572                                            let file_name = row.datum_at(0).unwrap().into_utf8();
573                                            let split = row.datum_at(1).unwrap().into_jsonb();
574                                            (file_name.to_owned(), split.to_owned_scalar())
575                                        })
576                                        .collect();
577                                    state_store_handler.set_states_json(jsonb_values).await?;
578                                    state_store_handler.try_flush().await?;
579                                }
580                                Message::Watermark(_) => unreachable!(),
581                            }
582                        }
583                        // StreamChunk from FsSourceReader, and the reader reads only one file.
584                        Either::Right(ChunksWithState {
585                            chunks,
586                            data_file_path,
587                            last_read_pos: _,
588                        }) => {
589                            // TODO: support persist progress after supporting reading part of a file.
590                            if true {
591                                splits_on_fetch -= 1;
592                                state_store_handler.delete(&data_file_path).await?;
593                            }
594
595                            for chunk in &chunks {
596                                let chunk = prune_additional_cols(
597                                    chunk,
598                                    &[file_path_idx, file_pos_idx],
599                                    &source_desc.columns,
600                                );
601                                // pad row_id
602                                let (chunk, op) = chunk.into_parts();
603                                let (mut columns, visibility) = chunk.into_parts();
604                                columns.insert(
605                                    row_id_idx,
606                                    Arc::new(
607                                        SerialArray::from_iter_bitmap(
608                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
609                                            Bitmap::zeros(columns[0].len()),
610                                        )
611                                        .into(),
612                                    ),
613                                );
614                                let chunk = StreamChunk::from_parts(
615                                    op,
616                                    DataChunk::from_parts(columns.into(), visibility),
617                                );
618
619                                yield Message::Chunk(chunk);
620                            }
621                        }
622                    }
623                }
624            }
625        }
626    }
627}
628
629impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
630    fn execute(self: Box<Self>) -> BoxedMessageStream {
631        self.into_stream().boxed()
632    }
633}
634
635impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
636    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
637        if let Some(core) = &self.stream_source_core {
638            f.debug_struct("IcebergFetchExecutor")
639                .field("source_id", &core.source_id)
640                .field("column_ids", &core.column_ids)
641                .finish()
642        } else {
643            f.debug_struct("IcebergFetchExecutor").finish()
644        }
645    }
646}