risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26    TableId,
27};
28use risingwave_common::config::StreamingConfig;
29use risingwave_common::hash::VnodeBitmapExt;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42/// An executor that fetches data from Iceberg tables.
43///
44/// This executor works with an upstream list executor that provides the list of files to read.
45/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
46/// downstream.
47pub struct IcebergFetchExecutor<S: StateStore> {
48    actor_ctx: ActorContextRef,
49
50    /// Core component for managing external streaming source state
51    stream_source_core: Option<StreamSourceCore<S>>,
52
53    /// Upstream list executor that provides the list of files to read.
54    /// This executor is responsible for discovering new files and changes in the Iceberg table.
55    upstream: Option<Executor>,
56
57    /// Optional rate limit in rows/s to control data ingestion speed
58    rate_limit_rps: Option<u32>,
59
60    /// Configuration for streaming operations, including Iceberg-specific settings
61    streaming_config: Arc<StreamingConfig>,
62}
63
64/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
65///
66/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
67/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
68struct ChunksWithState {
69    /// The actual data chunks read from the file
70    chunks: Vec<StreamChunk>,
71
72    /// Path to the data file, used for checkpointing and error reporting.
73    data_file_path: String,
74
75    /// The last read position in the file, used for checkpointing.
76    #[expect(dead_code)]
77    last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82    use anyhow::Context;
83    use iceberg::expr::BoundPredicate;
84    use iceberg::scan::FileScanTask;
85    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
86    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
87    use serde::{Deserialize, Serialize};
88
89    use crate::executor::StreamExecutorResult;
90    /// This corresponds to the actually persisted `FileScanTask` in the state table.
91    ///
92    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
93    /// Currently, they have the same definition.
94    ///
95    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
96    /// A version id needs to be introduced then.
97    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98    pub struct PersistedFileScanTask {
99        /// The start offset of the file to scan.
100        pub start: u64,
101        /// The length of the file to scan.
102        pub length: u64,
103        /// The number of records in the file to scan.
104        ///
105        /// This is an optional field, and only available if we are
106        /// reading the entire data file.
107        pub record_count: Option<u64>,
108
109        /// The data file path corresponding to the task.
110        pub data_file_path: String,
111
112        /// The content type of the file to scan.
113        pub data_file_content: DataContentType,
114
115        /// The format of the file to scan.
116        pub data_file_format: DataFileFormat,
117
118        /// The schema of the file to scan.
119        pub schema: SchemaRef,
120        /// The field ids to project.
121        pub project_field_ids: Vec<i32>,
122        /// The predicate to filter.
123        #[serde(skip_serializing_if = "Option::is_none")]
124        pub predicate: Option<BoundPredicate>,
125
126        /// The list of delete files that may need to be applied to this data file
127        pub deletes: Vec<PersistedFileScanTask>,
128        /// sequence number
129        pub sequence_number: i64,
130        /// equality ids
131        pub equality_ids: Vec<i32>,
132
133        pub file_size_in_bytes: u64,
134    }
135
136    impl PersistedFileScanTask {
137        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
138        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
139            let persisted_task: Self =
140                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
141                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
142            Ok(Self::to_task(persisted_task))
143        }
144
145        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
146        pub fn encode(task: FileScanTask) -> JsonbVal {
147            let persisted_task = Self::from_task(task);
148            serde_json::to_value(persisted_task).unwrap().into()
149        }
150
151        /// Converts a persisted task to a [`FileScanTask`].
152        fn to_task(
153            Self {
154                start,
155                length,
156                record_count,
157                data_file_path,
158                data_file_content,
159                data_file_format,
160                schema,
161                project_field_ids,
162                predicate,
163                deletes,
164                sequence_number,
165                equality_ids,
166                file_size_in_bytes,
167            }: Self,
168        ) -> FileScanTask {
169            FileScanTask {
170                start,
171                length,
172                record_count,
173                data_file_path,
174                data_file_content,
175                data_file_format,
176                schema,
177                project_field_ids,
178                predicate,
179                deletes: deletes
180                    .into_iter()
181                    .map(PersistedFileScanTask::to_task)
182                    .collect(),
183                sequence_number,
184                equality_ids,
185                file_size_in_bytes,
186            }
187        }
188
189        /// Changes a [`FileScanTask`] to a persisted one.
190        fn from_task(
191            FileScanTask {
192                start,
193                length,
194                record_count,
195                data_file_path,
196                data_file_content,
197                data_file_format,
198                schema,
199                project_field_ids,
200                predicate,
201                deletes,
202                sequence_number,
203                equality_ids,
204                file_size_in_bytes,
205            }: FileScanTask,
206        ) -> Self {
207            Self {
208                start,
209                length,
210                record_count,
211                data_file_path,
212                data_file_content,
213                data_file_format,
214                schema,
215                project_field_ids,
216                predicate,
217                deletes: deletes
218                    .into_iter()
219                    .map(PersistedFileScanTask::from_task)
220                    .collect(),
221                sequence_number,
222                equality_ids,
223                file_size_in_bytes,
224            }
225        }
226    }
227}
228
229impl<S: StateStore> IcebergFetchExecutor<S> {
230    pub fn new(
231        actor_ctx: ActorContextRef,
232        stream_source_core: StreamSourceCore<S>,
233        upstream: Executor,
234        rate_limit_rps: Option<u32>,
235        streaming_config: Arc<StreamingConfig>,
236    ) -> Self {
237        Self {
238            actor_ctx,
239            stream_source_core: Some(stream_source_core),
240            upstream: Some(upstream),
241            rate_limit_rps,
242            streaming_config,
243        }
244    }
245
246    #[expect(clippy::too_many_arguments)]
247    async fn replace_with_new_batch_reader<const BIASED: bool>(
248        splits_on_fetch: &mut usize,
249        state_store_handler: &SourceStateTableHandler<S>,
250        column_ids: Vec<ColumnId>,
251        source_ctx: SourceContext,
252        source_desc: SourceDesc,
253        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
254        rate_limit_rps: Option<u32>,
255        streaming_config: Arc<StreamingConfig>,
256    ) -> StreamExecutorResult<()> {
257        let mut batch =
258            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
259        let state_table = state_store_handler.state_table();
260        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
261            let table_iter = state_table
262                .iter_with_vnode(
263                    vnode,
264                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
265                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
266                    PrefetchOptions::prefetch_for_small_range_scan(),
267                )
268                .await?;
269            pin_mut!(table_iter);
270            while let Some(item) = table_iter.next().await {
271                let row = item?;
272                let task = match row.datum_at(1) {
273                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
274                        PersistedFileScanTask::decode(jsonb_ref)?
275                    }
276                    _ => unreachable!(),
277                };
278                batch.push(task);
279
280                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
281                    break 'vnodes;
282                }
283            }
284        }
285        if batch.is_empty() {
286            stream.replace_data_stream(stream::pending().boxed());
287        } else {
288            *splits_on_fetch += batch.len();
289            let batch_reader = Self::build_batched_stream_reader(
290                column_ids,
291                source_ctx,
292                source_desc,
293                batch,
294                rate_limit_rps,
295                streaming_config,
296            )
297            .map_err(StreamExecutorError::connector_error);
298            stream.replace_data_stream(batch_reader);
299        }
300
301        Ok(())
302    }
303
304    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
305    async fn build_batched_stream_reader(
306        _column_ids: Vec<ColumnId>,
307        _source_ctx: SourceContext,
308        source_desc: SourceDesc,
309        batch: Vec<FileScanTask>,
310        _rate_limit_rps: Option<u32>,
311        streaming_config: Arc<StreamingConfig>,
312    ) {
313        let file_path_idx = source_desc
314            .columns
315            .iter()
316            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
317            .unwrap();
318        let file_pos_idx = source_desc
319            .columns
320            .iter()
321            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
322            .unwrap();
323        let properties = source_desc.source.config.clone();
324        let properties = match properties {
325            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
326                iceberg_properties
327            }
328            _ => unreachable!(),
329        };
330        let table = properties.load_table().await?;
331
332        for task in batch {
333            let mut chunks = vec![];
334            #[for_await]
335            for chunk in scan_task_to_chunk(
336                table.clone(),
337                task,
338                IcebergScanOpts {
339                    chunk_size: streaming_config.developer.chunk_size,
340                    need_seq_num: true, /* TODO: this column is not needed. But need to support col pruning in frontend to remove it. */
341                    need_file_path_and_pos: true,
342                },
343                None,
344            ) {
345                let chunk = chunk?;
346                chunks.push(StreamChunk::from_parts(
347                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
348                    chunk,
349                ));
350            }
351            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
352            let last_chunk = chunks.last().unwrap();
353            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
354            let data_file_path = last_row
355                .datum_at(file_path_idx)
356                .unwrap()
357                .into_utf8()
358                .to_owned();
359            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
360            yield ChunksWithState {
361                chunks,
362                data_file_path,
363                last_read_pos,
364            };
365        }
366    }
367
368    fn build_source_ctx(
369        &self,
370        source_desc: &SourceDesc,
371        source_id: TableId,
372        source_name: &str,
373    ) -> SourceContext {
374        SourceContext::new(
375            self.actor_ctx.id,
376            source_id,
377            self.actor_ctx.fragment_id,
378            source_name.to_owned(),
379            source_desc.metrics.clone(),
380            SourceCtrlOpts {
381                chunk_size: limited_chunk_size(self.rate_limit_rps),
382                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
383            },
384            source_desc.source.config.clone(),
385            None,
386        )
387    }
388
389    #[try_stream(ok = Message, error = StreamExecutorError)]
390    async fn into_stream(mut self) {
391        let mut upstream = self.upstream.take().unwrap().execute();
392        let barrier = expect_first_barrier(&mut upstream).await?;
393        let first_epoch = barrier.epoch;
394        let is_pause_on_startup = barrier.is_pause_on_startup();
395        yield Message::Barrier(barrier);
396
397        let mut core = self.stream_source_core.take().unwrap();
398        let mut state_store_handler = core.split_state_store;
399
400        // Build source description from the builder.
401        let source_desc_builder = core.source_desc_builder.take().unwrap();
402
403        let source_desc = source_desc_builder
404            .build()
405            .map_err(StreamExecutorError::connector_error)?;
406
407        let file_path_idx = source_desc
408            .columns
409            .iter()
410            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
411            .unwrap();
412        let file_pos_idx = source_desc
413            .columns
414            .iter()
415            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
416            .unwrap();
417        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
418        let row_id_idx = source_desc
419            .columns
420            .iter()
421            .position(|c| c.name == ROW_ID_COLUMN_NAME)
422            .unwrap();
423        tracing::trace!(
424            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
425            source_desc.columns,
426            file_path_idx,
427            file_pos_idx,
428            row_id_idx
429        );
430        // Initialize state table.
431        state_store_handler.init_epoch(first_epoch).await?;
432
433        let mut splits_on_fetch: usize = 0;
434        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
435            upstream,
436            stream::pending().boxed(),
437        );
438
439        if is_pause_on_startup {
440            stream.pause_stream();
441        }
442
443        // If it is a recovery startup,
444        // there can be file assignments in the state table.
445        // Hence we try building a reader first.
446        Self::replace_with_new_batch_reader(
447            &mut splits_on_fetch,
448            &state_store_handler, // move into the function
449            core.column_ids.clone(),
450            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
451            source_desc.clone(),
452            &mut stream,
453            self.rate_limit_rps,
454            self.streaming_config.clone(),
455        )
456        .await?;
457
458        while let Some(msg) = stream.next().await {
459            match msg {
460                Err(e) => {
461                    tracing::error!(error = %e.as_report(), "Fetch Error");
462                    splits_on_fetch = 0;
463                }
464                Ok(msg) => {
465                    match msg {
466                        // This branch will be preferred.
467                        Either::Left(msg) => {
468                            match msg {
469                                Message::Barrier(barrier) => {
470                                    let mut need_rebuild_reader = false;
471
472                                    if let Some(mutation) = barrier.mutation.as_deref() {
473                                        match mutation {
474                                            Mutation::Pause => stream.pause_stream(),
475                                            Mutation::Resume => stream.resume_stream(),
476                                            Mutation::Throttle(actor_to_apply) => {
477                                                if let Some(new_rate_limit) =
478                                                    actor_to_apply.get(&self.actor_ctx.id)
479                                                    && *new_rate_limit != self.rate_limit_rps
480                                                {
481                                                    tracing::debug!(
482                                                        "updating rate limit from {:?} to {:?}",
483                                                        self.rate_limit_rps,
484                                                        *new_rate_limit
485                                                    );
486                                                    self.rate_limit_rps = *new_rate_limit;
487                                                    need_rebuild_reader = true;
488                                                }
489                                            }
490                                            _ => (),
491                                        }
492                                    }
493
494                                    let post_commit = state_store_handler
495                                        .commit_may_update_vnode_bitmap(barrier.epoch)
496                                        .await?;
497
498                                    let update_vnode_bitmap =
499                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
500                                    // Propagate the barrier.
501                                    yield Message::Barrier(barrier);
502
503                                    if let Some((_, cache_may_stale)) =
504                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
505                                    {
506                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
507                                        if cache_may_stale {
508                                            splits_on_fetch = 0;
509                                        }
510                                    }
511
512                                    if splits_on_fetch == 0 || need_rebuild_reader {
513                                        Self::replace_with_new_batch_reader(
514                                            &mut splits_on_fetch,
515                                            &state_store_handler,
516                                            core.column_ids.clone(),
517                                            self.build_source_ctx(
518                                                &source_desc,
519                                                core.source_id,
520                                                &core.source_name,
521                                            ),
522                                            source_desc.clone(),
523                                            &mut stream,
524                                            self.rate_limit_rps,
525                                            self.streaming_config.clone(),
526                                        )
527                                        .await?;
528                                    }
529                                }
530                                // Receiving file assignments from upstream list executor,
531                                // store into state table.
532                                Message::Chunk(chunk) => {
533                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
534                                        .data_chunk()
535                                        .rows()
536                                        .map(|row| {
537                                            let file_name = row.datum_at(0).unwrap().into_utf8();
538                                            let split = row.datum_at(1).unwrap().into_jsonb();
539                                            (file_name.to_owned(), split.to_owned_scalar())
540                                        })
541                                        .collect();
542                                    state_store_handler.set_states_json(jsonb_values).await?;
543                                    state_store_handler.try_flush().await?;
544                                }
545                                Message::Watermark(_) => unreachable!(),
546                            }
547                        }
548                        // StreamChunk from FsSourceReader, and the reader reads only one file.
549                        Either::Right(ChunksWithState {
550                            chunks,
551                            data_file_path,
552                            last_read_pos: _,
553                        }) => {
554                            // TODO: support persist progress after supporting reading part of a file.
555                            if true {
556                                splits_on_fetch -= 1;
557                                state_store_handler.delete(&data_file_path).await?;
558                            }
559
560                            for chunk in &chunks {
561                                let chunk = prune_additional_cols(
562                                    chunk,
563                                    file_path_idx,
564                                    file_pos_idx,
565                                    &source_desc.columns,
566                                );
567                                // pad row_id
568                                let (chunk, op) = chunk.into_parts();
569                                let (mut columns, visibility) = chunk.into_parts();
570                                columns.insert(
571                                    row_id_idx,
572                                    Arc::new(
573                                        SerialArray::from_iter_bitmap(
574                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
575                                            Bitmap::ones(columns[0].len()),
576                                        )
577                                        .into(),
578                                    ),
579                                );
580                                let chunk = StreamChunk::from_parts(
581                                    op,
582                                    DataChunk::from_parts(columns.into(), visibility),
583                                );
584
585                                yield Message::Chunk(chunk);
586                            }
587                        }
588                    }
589                }
590            }
591        }
592    }
593}
594
595impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
596    fn execute(self: Box<Self>) -> BoxedMessageStream {
597        self.into_stream().boxed()
598    }
599}
600
601impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
602    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
603        if let Some(core) = &self.stream_source_core {
604            f.debug_struct("IcebergFetchExecutor")
605                .field("source_id", &core.source_id)
606                .field("column_ids", &core.column_ids)
607                .finish()
608        } else {
609            f.debug_struct("IcebergFetchExecutor").finish()
610        }
611    }
612}