risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26    TableId,
27};
28use risingwave_common::config::StreamingConfig;
29use risingwave_common::hash::VnodeBitmapExt;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk};
32use risingwave_connector::source::reader::desc::SourceDesc;
33use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
34use risingwave_storage::store::PrefetchOptions;
35use thiserror_ext::AsReport;
36
37use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
38use crate::common::rate_limit::limited_chunk_size;
39use crate::executor::prelude::*;
40use crate::executor::stream_reader::StreamReaderWithPause;
41
42/// An executor that fetches data from Iceberg tables.
43///
44/// This executor works with an upstream list executor that provides the list of files to read.
45/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
46/// downstream.
47pub struct IcebergFetchExecutor<S: StateStore> {
48    actor_ctx: ActorContextRef,
49
50    /// Core component for managing external streaming source state
51    stream_source_core: Option<StreamSourceCore<S>>,
52
53    /// Upstream list executor that provides the list of files to read.
54    /// This executor is responsible for discovering new files and changes in the Iceberg table.
55    upstream: Option<Executor>,
56
57    /// Optional rate limit in rows/s to control data ingestion speed
58    rate_limit_rps: Option<u32>,
59
60    /// Configuration for streaming operations, including Iceberg-specific settings
61    streaming_config: Arc<StreamingConfig>,
62}
63
64/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
65///
66/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
67/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
68struct ChunksWithState {
69    /// The actual data chunks read from the file
70    chunks: Vec<StreamChunk>,
71
72    /// Path to the data file, used for checkpointing and error reporting.
73    data_file_path: String,
74
75    /// The last read position in the file, used for checkpointing.
76    #[expect(dead_code)]
77    last_read_pos: Datum,
78}
79
80pub(super) use state::PersistedFileScanTask;
81mod state {
82    use std::sync::Arc;
83
84    use anyhow::Context;
85    use iceberg::expr::BoundPredicate;
86    use iceberg::scan::FileScanTask;
87    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
88    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
89    use serde::{Deserialize, Serialize};
90
91    use crate::executor::StreamExecutorResult;
92    /// This corresponds to the actually persisted `FileScanTask` in the state table.
93    ///
94    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
95    /// Currently, they have the same definition.
96    ///
97    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
98    /// A version id needs to be introduced then.
99    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100    pub struct PersistedFileScanTask {
101        /// The start offset of the file to scan.
102        pub start: u64,
103        /// The length of the file to scan.
104        pub length: u64,
105        /// The number of records in the file to scan.
106        ///
107        /// This is an optional field, and only available if we are
108        /// reading the entire data file.
109        pub record_count: Option<u64>,
110
111        /// The data file path corresponding to the task.
112        pub data_file_path: String,
113
114        /// The content type of the file to scan.
115        pub data_file_content: DataContentType,
116
117        /// The format of the file to scan.
118        pub data_file_format: DataFileFormat,
119
120        /// The schema of the file to scan.
121        pub schema: SchemaRef,
122        /// The field ids to project.
123        pub project_field_ids: Vec<i32>,
124        /// The predicate to filter.
125        #[serde(skip_serializing_if = "Option::is_none")]
126        pub predicate: Option<BoundPredicate>,
127
128        /// The list of delete files that may need to be applied to this data file
129        pub deletes: Vec<PersistedFileScanTask>,
130        /// sequence number
131        pub sequence_number: i64,
132        /// equality ids
133        pub equality_ids: Vec<i32>,
134
135        pub file_size_in_bytes: u64,
136    }
137
138    impl PersistedFileScanTask {
139        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
140        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
141            let persisted_task: Self =
142                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
143                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
144            Ok(Self::to_task(persisted_task))
145        }
146
147        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
148        pub fn encode(task: FileScanTask) -> JsonbVal {
149            let persisted_task = Self::from_task(task);
150            serde_json::to_value(persisted_task).unwrap().into()
151        }
152
153        /// Converts a persisted task to a [`FileScanTask`].
154        fn to_task(
155            Self {
156                start,
157                length,
158                record_count,
159                data_file_path,
160                data_file_content,
161                data_file_format,
162                schema,
163                project_field_ids,
164                predicate,
165                deletes,
166                sequence_number,
167                equality_ids,
168                file_size_in_bytes,
169            }: Self,
170        ) -> FileScanTask {
171            FileScanTask {
172                start,
173                length,
174                record_count,
175                data_file_path,
176                data_file_content,
177                data_file_format,
178                schema,
179                project_field_ids,
180                predicate,
181                deletes: deletes
182                    .into_iter()
183                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
184                    .collect(),
185                sequence_number,
186                equality_ids,
187                file_size_in_bytes,
188            }
189        }
190
191        /// Changes a [`FileScanTask`] to a persisted one.
192        fn from_task(
193            FileScanTask {
194                start,
195                length,
196                record_count,
197                data_file_path,
198                data_file_content,
199                data_file_format,
200                schema,
201                project_field_ids,
202                predicate,
203                deletes,
204                sequence_number,
205                equality_ids,
206                file_size_in_bytes,
207            }: FileScanTask,
208        ) -> Self {
209            Self {
210                start,
211                length,
212                record_count,
213                data_file_path,
214                data_file_content,
215                data_file_format,
216                schema,
217                project_field_ids,
218                predicate,
219                deletes: deletes
220                    .into_iter()
221                    .map(PersistedFileScanTask::from_task_ref)
222                    .collect(),
223                sequence_number,
224                equality_ids,
225                file_size_in_bytes,
226            }
227        }
228
229        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
230            Self {
231                start: task.start,
232                length: task.length,
233                record_count: task.record_count,
234                data_file_path: task.data_file_path.clone(),
235                data_file_content: task.data_file_content,
236                data_file_format: task.data_file_format,
237                schema: task.schema.clone(),
238                project_field_ids: task.project_field_ids.clone(),
239                predicate: task.predicate.clone(),
240                deletes: task
241                    .deletes
242                    .iter()
243                    .cloned()
244                    .map(PersistedFileScanTask::from_task_ref)
245                    .collect(),
246                sequence_number: task.sequence_number,
247                equality_ids: task.equality_ids.clone(),
248                file_size_in_bytes: task.file_size_in_bytes,
249            }
250        }
251    }
252}
253
254impl<S: StateStore> IcebergFetchExecutor<S> {
255    pub fn new(
256        actor_ctx: ActorContextRef,
257        stream_source_core: StreamSourceCore<S>,
258        upstream: Executor,
259        rate_limit_rps: Option<u32>,
260        streaming_config: Arc<StreamingConfig>,
261    ) -> Self {
262        Self {
263            actor_ctx,
264            stream_source_core: Some(stream_source_core),
265            upstream: Some(upstream),
266            rate_limit_rps,
267            streaming_config,
268        }
269    }
270
271    #[expect(clippy::too_many_arguments)]
272    async fn replace_with_new_batch_reader<const BIASED: bool>(
273        splits_on_fetch: &mut usize,
274        state_store_handler: &SourceStateTableHandler<S>,
275        column_ids: Vec<ColumnId>,
276        source_ctx: SourceContext,
277        source_desc: SourceDesc,
278        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
279        rate_limit_rps: Option<u32>,
280        streaming_config: Arc<StreamingConfig>,
281    ) -> StreamExecutorResult<()> {
282        let mut batch =
283            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
284        let state_table = state_store_handler.state_table();
285        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
286            let table_iter = state_table
287                .iter_with_vnode(
288                    vnode,
289                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
290                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
291                    PrefetchOptions::prefetch_for_small_range_scan(),
292                )
293                .await?;
294            pin_mut!(table_iter);
295            while let Some(item) = table_iter.next().await {
296                let row = item?;
297                let task = match row.datum_at(1) {
298                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
299                        PersistedFileScanTask::decode(jsonb_ref)?
300                    }
301                    _ => unreachable!(),
302                };
303                batch.push(task);
304
305                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
306                    break 'vnodes;
307                }
308            }
309        }
310        if batch.is_empty() {
311            stream.replace_data_stream(stream::pending().boxed());
312        } else {
313            *splits_on_fetch += batch.len();
314            let batch_reader = Self::build_batched_stream_reader(
315                column_ids,
316                source_ctx,
317                source_desc,
318                batch,
319                rate_limit_rps,
320                streaming_config,
321            )
322            .map_err(StreamExecutorError::connector_error);
323            stream.replace_data_stream(batch_reader);
324        }
325
326        Ok(())
327    }
328
329    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
330    async fn build_batched_stream_reader(
331        _column_ids: Vec<ColumnId>,
332        _source_ctx: SourceContext,
333        source_desc: SourceDesc,
334        batch: Vec<FileScanTask>,
335        _rate_limit_rps: Option<u32>,
336        streaming_config: Arc<StreamingConfig>,
337    ) {
338        let file_path_idx = source_desc
339            .columns
340            .iter()
341            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
342            .unwrap();
343        let file_pos_idx = source_desc
344            .columns
345            .iter()
346            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
347            .unwrap();
348        let properties = source_desc.source.config.clone();
349        let properties = match properties {
350            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
351                iceberg_properties
352            }
353            _ => unreachable!(),
354        };
355        let table = properties.load_table().await?;
356
357        for task in batch {
358            let mut chunks = vec![];
359            #[for_await]
360            for chunk in scan_task_to_chunk(
361                table.clone(),
362                task,
363                IcebergScanOpts {
364                    chunk_size: streaming_config.developer.chunk_size,
365                    need_seq_num: true, /* TODO: this column is not needed. But need to support col pruning in frontend to remove it. */
366                    need_file_path_and_pos: true,
367                },
368                None,
369            ) {
370                let chunk = chunk?;
371                chunks.push(StreamChunk::from_parts(
372                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
373                    chunk,
374                ));
375            }
376            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
377            let last_chunk = chunks.last().unwrap();
378            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
379            let data_file_path = last_row
380                .datum_at(file_path_idx)
381                .unwrap()
382                .into_utf8()
383                .to_owned();
384            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
385            yield ChunksWithState {
386                chunks,
387                data_file_path,
388                last_read_pos,
389            };
390        }
391    }
392
393    fn build_source_ctx(
394        &self,
395        source_desc: &SourceDesc,
396        source_id: TableId,
397        source_name: &str,
398    ) -> SourceContext {
399        SourceContext::new(
400            self.actor_ctx.id,
401            source_id,
402            self.actor_ctx.fragment_id,
403            source_name.to_owned(),
404            source_desc.metrics.clone(),
405            SourceCtrlOpts {
406                chunk_size: limited_chunk_size(self.rate_limit_rps),
407                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
408            },
409            source_desc.source.config.clone(),
410            None,
411        )
412    }
413
414    #[try_stream(ok = Message, error = StreamExecutorError)]
415    async fn into_stream(mut self) {
416        let mut upstream = self.upstream.take().unwrap().execute();
417        let barrier = expect_first_barrier(&mut upstream).await?;
418        let first_epoch = barrier.epoch;
419        let is_pause_on_startup = barrier.is_pause_on_startup();
420        yield Message::Barrier(barrier);
421
422        let mut core = self.stream_source_core.take().unwrap();
423        let mut state_store_handler = core.split_state_store;
424
425        // Build source description from the builder.
426        let source_desc_builder = core.source_desc_builder.take().unwrap();
427
428        let source_desc = source_desc_builder
429            .build()
430            .map_err(StreamExecutorError::connector_error)?;
431
432        let file_path_idx = source_desc
433            .columns
434            .iter()
435            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
436            .unwrap();
437        let file_pos_idx = source_desc
438            .columns
439            .iter()
440            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
441            .unwrap();
442        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
443        let row_id_idx = source_desc
444            .columns
445            .iter()
446            .position(|c| c.name == ROW_ID_COLUMN_NAME)
447            .unwrap();
448        tracing::trace!(
449            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
450            source_desc.columns,
451            file_path_idx,
452            file_pos_idx,
453            row_id_idx
454        );
455        // Initialize state table.
456        state_store_handler.init_epoch(first_epoch).await?;
457
458        let mut splits_on_fetch: usize = 0;
459        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
460            upstream,
461            stream::pending().boxed(),
462        );
463
464        if is_pause_on_startup {
465            stream.pause_stream();
466        }
467
468        // If it is a recovery startup,
469        // there can be file assignments in the state table.
470        // Hence we try building a reader first.
471        Self::replace_with_new_batch_reader(
472            &mut splits_on_fetch,
473            &state_store_handler, // move into the function
474            core.column_ids.clone(),
475            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
476            source_desc.clone(),
477            &mut stream,
478            self.rate_limit_rps,
479            self.streaming_config.clone(),
480        )
481        .await?;
482
483        while let Some(msg) = stream.next().await {
484            match msg {
485                Err(e) => {
486                    tracing::error!(error = %e.as_report(), "Fetch Error");
487                    splits_on_fetch = 0;
488                }
489                Ok(msg) => {
490                    match msg {
491                        // This branch will be preferred.
492                        Either::Left(msg) => {
493                            match msg {
494                                Message::Barrier(barrier) => {
495                                    let mut need_rebuild_reader = false;
496
497                                    if let Some(mutation) = barrier.mutation.as_deref() {
498                                        match mutation {
499                                            Mutation::Pause => stream.pause_stream(),
500                                            Mutation::Resume => stream.resume_stream(),
501                                            Mutation::Throttle(actor_to_apply) => {
502                                                if let Some(new_rate_limit) =
503                                                    actor_to_apply.get(&self.actor_ctx.id)
504                                                    && *new_rate_limit != self.rate_limit_rps
505                                                {
506                                                    tracing::debug!(
507                                                        "updating rate limit from {:?} to {:?}",
508                                                        self.rate_limit_rps,
509                                                        *new_rate_limit
510                                                    );
511                                                    self.rate_limit_rps = *new_rate_limit;
512                                                    need_rebuild_reader = true;
513                                                }
514                                            }
515                                            _ => (),
516                                        }
517                                    }
518
519                                    let post_commit = state_store_handler
520                                        .commit_may_update_vnode_bitmap(barrier.epoch)
521                                        .await?;
522
523                                    let update_vnode_bitmap =
524                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
525                                    // Propagate the barrier.
526                                    yield Message::Barrier(barrier);
527
528                                    if let Some((_, cache_may_stale)) =
529                                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
530                                    {
531                                        // if cache_may_stale, we must rebuild the stream to adjust vnode mappings
532                                        if cache_may_stale {
533                                            splits_on_fetch = 0;
534                                        }
535                                    }
536
537                                    if splits_on_fetch == 0 || need_rebuild_reader {
538                                        Self::replace_with_new_batch_reader(
539                                            &mut splits_on_fetch,
540                                            &state_store_handler,
541                                            core.column_ids.clone(),
542                                            self.build_source_ctx(
543                                                &source_desc,
544                                                core.source_id,
545                                                &core.source_name,
546                                            ),
547                                            source_desc.clone(),
548                                            &mut stream,
549                                            self.rate_limit_rps,
550                                            self.streaming_config.clone(),
551                                        )
552                                        .await?;
553                                    }
554                                }
555                                // Receiving file assignments from upstream list executor,
556                                // store into state table.
557                                Message::Chunk(chunk) => {
558                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
559                                        .data_chunk()
560                                        .rows()
561                                        .map(|row| {
562                                            let file_name = row.datum_at(0).unwrap().into_utf8();
563                                            let split = row.datum_at(1).unwrap().into_jsonb();
564                                            (file_name.to_owned(), split.to_owned_scalar())
565                                        })
566                                        .collect();
567                                    state_store_handler.set_states_json(jsonb_values).await?;
568                                    state_store_handler.try_flush().await?;
569                                }
570                                Message::Watermark(_) => unreachable!(),
571                            }
572                        }
573                        // StreamChunk from FsSourceReader, and the reader reads only one file.
574                        Either::Right(ChunksWithState {
575                            chunks,
576                            data_file_path,
577                            last_read_pos: _,
578                        }) => {
579                            // TODO: support persist progress after supporting reading part of a file.
580                            if true {
581                                splits_on_fetch -= 1;
582                                state_store_handler.delete(&data_file_path).await?;
583                            }
584
585                            for chunk in &chunks {
586                                let chunk = prune_additional_cols(
587                                    chunk,
588                                    file_path_idx,
589                                    file_pos_idx,
590                                    &source_desc.columns,
591                                );
592                                // pad row_id
593                                let (chunk, op) = chunk.into_parts();
594                                let (mut columns, visibility) = chunk.into_parts();
595                                columns.insert(
596                                    row_id_idx,
597                                    Arc::new(
598                                        SerialArray::from_iter_bitmap(
599                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
600                                            Bitmap::ones(columns[0].len()),
601                                        )
602                                        .into(),
603                                    ),
604                                );
605                                let chunk = StreamChunk::from_parts(
606                                    op,
607                                    DataChunk::from_parts(columns.into(), visibility),
608                                );
609
610                                yield Message::Chunk(chunk);
611                            }
612                        }
613                    }
614                }
615            }
616        }
617    }
618}
619
620impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
621    fn execute(self: Box<Self>) -> BoxedMessageStream {
622        self.into_stream().boxed()
623    }
624}
625
626impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
627    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628        if let Some(core) = &self.stream_source_core {
629            f.debug_struct("IcebergFetchExecutor")
630                .field("source_id", &core.source_id)
631                .field("column_ids", &core.column_ids)
632                .finish()
633        } else {
634            f.debug_struct("IcebergFetchExecutor").finish()
635        }
636    }
637}