risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26    TableId,
27};
28use risingwave_common::config::StreamingConfig;
29use risingwave_common::hash::VnodeBitmapExt;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42/// An executor that fetches data from Iceberg tables.
43///
44/// This executor works with an upstream list executor that provides the list of files to read.
45/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
46/// downstream.
47pub struct IcebergFetchExecutor<S: StateStore> {
48    actor_ctx: ActorContextRef,
49
50    /// Core component for managing external streaming source state
51    stream_source_core: Option<StreamSourceCore<S>>,
52
53    /// Upstream list executor that provides the list of files to read.
54    /// This executor is responsible for discovering new files and changes in the Iceberg table.
55    upstream: Option<Executor>,
56
57    /// Optional rate limit in rows/s to control data ingestion speed
58    rate_limit_rps: Option<u32>,
59
60    /// Configuration for streaming operations, including Iceberg-specific settings
61    streaming_config: Arc<StreamingConfig>,
62}
63
64/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
65///
66/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
67/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
68struct ChunksWithState {
69    /// The actual data chunks read from the file
70    chunks: Vec<StreamChunk>,
71
72    /// Path to the data file, used for checkpointing and error reporting.
73    data_file_path: String,
74
75    /// The last read position in the file, used for checkpointing.
76    #[expect(dead_code)]
77    last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82    use anyhow::Context;
83    use iceberg::expr::BoundPredicate;
84    use iceberg::scan::FileScanTask;
85    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
86    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
87    use serde::{Deserialize, Serialize};
88
89    use crate::executor::StreamExecutorResult;
90    /// This corresponds to the actually persisted `FileScanTask` in the state table.
91    ///
92    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
93    /// Currently, they have the same definition.
94    ///
95    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
96    /// A version id needs to be introduced then.
97    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98    pub struct PersistedFileScanTask {
99        /// The start offset of the file to scan.
100        pub start: u64,
101        /// The length of the file to scan.
102        pub length: u64,
103        /// The number of records in the file to scan.
104        ///
105        /// This is an optional field, and only available if we are
106        /// reading the entire data file.
107        pub record_count: Option<u64>,
108
109        /// The data file path corresponding to the task.
110        pub data_file_path: String,
111
112        /// The content type of the file to scan.
113        pub data_file_content: DataContentType,
114
115        /// The format of the file to scan.
116        pub data_file_format: DataFileFormat,
117
118        /// The schema of the file to scan.
119        pub schema: SchemaRef,
120        /// The field ids to project.
121        pub project_field_ids: Vec<i32>,
122        /// The predicate to filter.
123        #[serde(skip_serializing_if = "Option::is_none")]
124        pub predicate: Option<BoundPredicate>,
125
126        /// The list of delete files that may need to be applied to this data file
127        pub deletes: Vec<PersistedFileScanTask>,
128        /// sequence number
129        pub sequence_number: i64,
130        /// equality ids
131        pub equality_ids: Vec<i32>,
132    }
133
134    impl PersistedFileScanTask {
135        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
136        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
137            let persisted_task: Self =
138                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
139                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
140            Ok(Self::to_task(persisted_task))
141        }
142
143        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
144        pub fn encode(task: FileScanTask) -> JsonbVal {
145            let persisted_task = Self::from_task(task);
146            serde_json::to_value(persisted_task).unwrap().into()
147        }
148
149        /// Converts a persisted task to a [`FileScanTask`].
150        fn to_task(
151            Self {
152                start,
153                length,
154                record_count,
155                data_file_path,
156                data_file_content,
157                data_file_format,
158                schema,
159                project_field_ids,
160                predicate,
161                deletes,
162                sequence_number,
163                equality_ids,
164            }: Self,
165        ) -> FileScanTask {
166            FileScanTask {
167                start,
168                length,
169                record_count,
170                data_file_path,
171                data_file_content,
172                data_file_format,
173                schema,
174                project_field_ids,
175                predicate,
176                deletes: deletes
177                    .into_iter()
178                    .map(PersistedFileScanTask::to_task)
179                    .collect(),
180                sequence_number,
181                equality_ids,
182            }
183        }
184
185        /// Changes a [`FileScanTask`] to a persisted one.
186        fn from_task(
187            FileScanTask {
188                start,
189                length,
190                record_count,
191                data_file_path,
192                data_file_content,
193                data_file_format,
194                schema,
195                project_field_ids,
196                predicate,
197                deletes,
198                sequence_number,
199                equality_ids,
200            }: FileScanTask,
201        ) -> Self {
202            Self {
203                start,
204                length,
205                record_count,
206                data_file_path,
207                data_file_content,
208                data_file_format,
209                schema,
210                project_field_ids,
211                predicate,
212                deletes: deletes
213                    .into_iter()
214                    .map(PersistedFileScanTask::from_task)
215                    .collect(),
216                sequence_number,
217                equality_ids,
218            }
219        }
220    }
221}
222
223impl<S: StateStore> IcebergFetchExecutor<S> {
224    pub fn new(
225        actor_ctx: ActorContextRef,
226        stream_source_core: StreamSourceCore<S>,
227        upstream: Executor,
228        rate_limit_rps: Option<u32>,
229        streaming_config: Arc<StreamingConfig>,
230    ) -> Self {
231        Self {
232            actor_ctx,
233            stream_source_core: Some(stream_source_core),
234            upstream: Some(upstream),
235            rate_limit_rps,
236            streaming_config,
237        }
238    }
239
240    #[expect(clippy::too_many_arguments)]
241    async fn replace_with_new_batch_reader<const BIASED: bool>(
242        splits_on_fetch: &mut usize,
243        state_store_handler: &SourceStateTableHandler<S>,
244        column_ids: Vec<ColumnId>,
245        source_ctx: SourceContext,
246        source_desc: SourceDesc,
247        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
248        rate_limit_rps: Option<u32>,
249        streaming_config: Arc<StreamingConfig>,
250    ) -> StreamExecutorResult<()> {
251        let mut batch =
252            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
253        let state_table = state_store_handler.state_table();
254        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
255            let table_iter = state_table
256                .iter_with_vnode(
257                    vnode,
258                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
259                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
260                    PrefetchOptions::prefetch_for_small_range_scan(),
261                )
262                .await?;
263            pin_mut!(table_iter);
264            while let Some(item) = table_iter.next().await {
265                let row = item?;
266                let task = match row.datum_at(1) {
267                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
268                        PersistedFileScanTask::decode(jsonb_ref)?
269                    }
270                    _ => unreachable!(),
271                };
272                batch.push(task);
273
274                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
275                    break 'vnodes;
276                }
277            }
278        }
279        if batch.is_empty() {
280            stream.replace_data_stream(stream::pending().boxed());
281        } else {
282            *splits_on_fetch += batch.len();
283            let batch_reader = Self::build_batched_stream_reader(
284                column_ids,
285                source_ctx,
286                source_desc,
287                batch,
288                rate_limit_rps,
289                streaming_config,
290            )
291            .map_err(StreamExecutorError::connector_error);
292            stream.replace_data_stream(batch_reader);
293        }
294
295        Ok(())
296    }
297
298    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
299    async fn build_batched_stream_reader(
300        _column_ids: Vec<ColumnId>,
301        _source_ctx: SourceContext,
302        source_desc: SourceDesc,
303        batch: Vec<FileScanTask>,
304        _rate_limit_rps: Option<u32>,
305        streaming_config: Arc<StreamingConfig>,
306    ) {
307        let file_path_idx = source_desc
308            .columns
309            .iter()
310            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
311            .unwrap();
312        let file_pos_idx = source_desc
313            .columns
314            .iter()
315            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
316            .unwrap();
317        let properties = source_desc.source.config.clone();
318        let properties = match properties {
319            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
320                iceberg_properties
321            }
322            _ => unreachable!(),
323        };
324        let table = properties.load_table().await?;
325
326        for task in batch {
327            let mut chunks = vec![];
328            #[for_await]
329            for chunk in scan_task_to_chunk(
330                table.clone(),
331                task,
332                IcebergScanOpts {
333                    chunk_size: streaming_config.developer.chunk_size,
334                    need_seq_num: true, /* TODO: this column is not needed. But need to support col pruning in frontend to remove it. */
335                    need_file_path_and_pos: true,
336                },
337                None,
338            ) {
339                let chunk = chunk?;
340                chunks.push(StreamChunk::from_parts(
341                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
342                    chunk,
343                ));
344            }
345            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
346            let last_chunk = chunks.last().unwrap();
347            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
348            let data_file_path = last_row
349                .datum_at(file_path_idx)
350                .unwrap()
351                .into_utf8()
352                .to_owned();
353            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
354            yield ChunksWithState {
355                chunks,
356                data_file_path,
357                last_read_pos,
358            };
359        }
360    }
361
362    fn build_source_ctx(
363        &self,
364        source_desc: &SourceDesc,
365        source_id: TableId,
366        source_name: &str,
367    ) -> SourceContext {
368        SourceContext::new(
369            self.actor_ctx.id,
370            source_id,
371            self.actor_ctx.fragment_id,
372            source_name.to_owned(),
373            source_desc.metrics.clone(),
374            SourceCtrlOpts {
375                chunk_size: limited_chunk_size(self.rate_limit_rps),
376                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
377            },
378            source_desc.source.config.clone(),
379            None,
380        )
381    }
382
383    #[try_stream(ok = Message, error = StreamExecutorError)]
384    async fn into_stream(mut self) {
385        let mut upstream = self.upstream.take().unwrap().execute();
386        let barrier = expect_first_barrier(&mut upstream).await?;
387        let first_epoch = barrier.epoch;
388        let is_pause_on_startup = barrier.is_pause_on_startup();
389        yield Message::Barrier(barrier);
390
391        let mut core = self.stream_source_core.take().unwrap();
392        let mut state_store_handler = core.split_state_store;
393
394        // Build source description from the builder.
395        let source_desc_builder = core.source_desc_builder.take().unwrap();
396
397        let source_desc = source_desc_builder
398            .build()
399            .map_err(StreamExecutorError::connector_error)?;
400
401        let file_path_idx = source_desc
402            .columns
403            .iter()
404            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
405            .unwrap();
406        let file_pos_idx = source_desc
407            .columns
408            .iter()
409            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
410            .unwrap();
411        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
412        let row_id_idx = source_desc
413            .columns
414            .iter()
415            .position(|c| c.name == ROW_ID_COLUMN_NAME)
416            .unwrap();
417        tracing::trace!(
418            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
419            source_desc.columns,
420            file_path_idx,
421            file_pos_idx,
422            row_id_idx
423        );
424        // Initialize state table.
425        state_store_handler.init_epoch(first_epoch).await?;
426
427        let mut splits_on_fetch: usize = 0;
428        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
429            upstream,
430            stream::pending().boxed(),
431        );
432
433        if is_pause_on_startup {
434            stream.pause_stream();
435        }
436
437        // If it is a recovery startup,
438        // there can be file assignments in the state table.
439        // Hence we try building a reader first.
440        Self::replace_with_new_batch_reader(
441            &mut splits_on_fetch,
442            &state_store_handler, // move into the function
443            core.column_ids.clone(),
444            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
445            source_desc.clone(),
446            &mut stream,
447            self.rate_limit_rps,
448            self.streaming_config.clone(),
449        )
450        .await?;
451
452        while let Some(msg) = stream.next().await {
453            match msg {
454                Err(e) => {
455                    tracing::error!(error = %e.as_report(), "Fetch Error");
456                    splits_on_fetch = 0;
457                }
458                Ok(msg) => {
459                    match msg {
460                        // This branch will be preferred.
461                        Either::Left(msg) => {
462                            match msg {
463                                Message::Barrier(barrier) => {
464                                    let mut need_rebuild_reader = false;
465
466                                    if let Some(mutation) = barrier.mutation.as_deref() {
467                                        match mutation {
468                                            Mutation::Pause => stream.pause_stream(),
469                                            Mutation::Resume => stream.resume_stream(),
470                                            Mutation::Throttle(actor_to_apply) => {
471                                                if let Some(new_rate_limit) =
472                                                    actor_to_apply.get(&self.actor_ctx.id)
473                                                    && *new_rate_limit != self.rate_limit_rps
474                                                {
475                                                    tracing::debug!(
476                                                        "updating rate limit from {:?} to {:?}",
477                                                        self.rate_limit_rps,
478                                                        *new_rate_limit
479                                                    );
480                                                    self.rate_limit_rps = *new_rate_limit;
481                                                    need_rebuild_reader = true;
482                                                }
483                                            }
484                                            _ => (),
485                                        }
486                                    }
487
488                                    let post_commit = state_store_handler
489                                        .commit_may_update_vnode_bitmap(barrier.epoch)
490                                        .await?;
491
492                                    let update_vnode_bitmap =
493                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
494                                    // Propagate the barrier.
495                                    yield Message::Barrier(barrier);
496
497                                    if let Some((_, cache_may_stale)) =
498                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
499                                    {
500                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
501                                        if cache_may_stale {
502                                            splits_on_fetch = 0;
503                                        }
504                                    }
505
506                                    if splits_on_fetch == 0 || need_rebuild_reader {
507                                        Self::replace_with_new_batch_reader(
508                                            &mut splits_on_fetch,
509                                            &state_store_handler,
510                                            core.column_ids.clone(),
511                                            self.build_source_ctx(
512                                                &source_desc,
513                                                core.source_id,
514                                                &core.source_name,
515                                            ),
516                                            source_desc.clone(),
517                                            &mut stream,
518                                            self.rate_limit_rps,
519                                            self.streaming_config.clone(),
520                                        )
521                                        .await?;
522                                    }
523                                }
524                                // Receiving file assignments from upstream list executor,
525                                // store into state table.
526                                Message::Chunk(chunk) => {
527                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
528                                        .data_chunk()
529                                        .rows()
530                                        .map(|row| {
531                                            let file_name = row.datum_at(0).unwrap().into_utf8();
532                                            let split = row.datum_at(1).unwrap().into_jsonb();
533                                            (file_name.to_owned(), split.to_owned_scalar())
534                                        })
535                                        .collect();
536                                    state_store_handler.set_states_json(jsonb_values).await?;
537                                    state_store_handler.try_flush().await?;
538                                }
539                                Message::Watermark(_) => unreachable!(),
540                            }
541                        }
542                        // StreamChunk from FsSourceReader, and the reader reads only one file.
543                        Either::Right(ChunksWithState {
544                            chunks,
545                            data_file_path,
546                            last_read_pos: _,
547                        }) => {
548                            // TODO: support persist progress after supporting reading part of a file.
549                            if true {
550                                splits_on_fetch -= 1;
551                                state_store_handler.delete(&data_file_path).await?;
552                            }
553
554                            for chunk in &chunks {
555                                let chunk = prune_additional_cols(
556                                    chunk,
557                                    file_path_idx,
558                                    file_pos_idx,
559                                    &source_desc.columns,
560                                );
561                                // pad row_id
562                                let (chunk, op) = chunk.into_parts();
563                                let (mut columns, visibility) = chunk.into_parts();
564                                columns.insert(
565                                    row_id_idx,
566                                    Arc::new(
567                                        SerialArray::from_iter_bitmap(
568                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
569                                            Bitmap::ones(columns[0].len()),
570                                        )
571                                        .into(),
572                                    ),
573                                );
574                                let chunk = StreamChunk::from_parts(
575                                    op,
576                                    DataChunk::from_parts(columns.into(), visibility),
577                                );
578
579                                yield Message::Chunk(chunk);
580                            }
581                        }
582                    }
583                }
584            }
585        }
586    }
587}
588
589impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
590    fn execute(self: Box<Self>) -> BoxedMessageStream {
591        self.into_stream().boxed()
592    }
593}
594
595impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
596    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
597        if let Some(core) = &self.stream_source_core {
598            f.debug_struct("IcebergFetchExecutor")
599                .field("source_id", &core.source_id)
600                .field("column_ids", &core.column_ids)
601                .finish()
602        } else {
603            f.debug_struct("IcebergFetchExecutor").finish()
604        }
605    }
606}