Skip to main content

risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
32use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
33use risingwave_connector::source::reader::desc::SourceDesc;
34use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
40use crate::common::rate_limit::limited_chunk_size;
41use crate::executor::prelude::*;
42use crate::executor::stream_reader::StreamReaderWithPause;
43
44/// An executor that fetches data from Iceberg tables.
45///
46/// This executor works with an upstream list executor that provides the list of files to read.
47/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
48/// downstream.
49pub struct IcebergFetchExecutor<S: StateStore> {
50    actor_ctx: ActorContextRef,
51
52    /// Core component for managing external streaming source state
53    stream_source_core: Option<StreamSourceCore<S>>,
54
55    /// Upstream list executor that provides the list of files to read.
56    /// This executor is responsible for discovering new files and changes in the Iceberg table.
57    upstream: Option<Executor>,
58
59    /// Optional rate limit in rows/s to control data ingestion speed
60    rate_limit_rps: Option<u32>,
61
62    /// Configuration for streaming operations, including Iceberg-specific settings
63    streaming_config: Arc<StreamingConfig>,
64}
65
66/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
67///
68/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
69/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
70pub(crate) struct ChunksWithState {
71    /// The actual data chunks read from the file
72    pub chunks: Vec<StreamChunk>,
73
74    /// Path to the data file, used for checkpointing and error reporting.
75    pub data_file_path: String,
76
77    /// The last read position in the file, used for checkpointing.
78    #[expect(dead_code)]
79    pub last_read_pos: Datum,
80}
81
82pub(super) use state::PersistedFileScanTask;
83mod state {
84    use std::sync::Arc;
85
86    use anyhow::Context;
87    use iceberg::expr::BoundPredicate;
88    use iceberg::scan::FileScanTask;
89    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
90    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
91    use serde::{Deserialize, Serialize};
92
93    use crate::executor::StreamExecutorResult;
94
95    fn default_case_sensitive() -> bool {
96        true
97    }
98    /// This corresponds to the actually persisted `FileScanTask` in the state table.
99    ///
100    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
101    /// Currently, they have the same definition.
102    ///
103    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
104    /// A version id needs to be introduced then.
105    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106    pub struct PersistedFileScanTask {
107        /// The start offset of the file to scan.
108        pub start: u64,
109        /// The length of the file to scan.
110        pub length: u64,
111        /// The number of records in the file to scan.
112        ///
113        /// This is an optional field, and only available if we are
114        /// reading the entire data file.
115        pub record_count: Option<u64>,
116
117        /// The data file path corresponding to the task.
118        pub data_file_path: String,
119
120        /// The data file referenced by a deletion vector.
121        #[serde(default, skip_serializing_if = "Option::is_none")]
122        pub referenced_data_file: Option<String>,
123
124        /// The content type of the file to scan.
125        pub data_file_content: DataContentType,
126
127        /// The format of the file to scan.
128        pub data_file_format: DataFileFormat,
129
130        /// The schema of the file to scan.
131        pub schema: SchemaRef,
132        /// The field ids to project.
133        pub project_field_ids: Vec<i32>,
134        /// The predicate to filter.
135        #[serde(skip_serializing_if = "Option::is_none")]
136        pub predicate: Option<BoundPredicate>,
137
138        /// The list of delete files that may need to be applied to this data file
139        pub deletes: Vec<PersistedFileScanTask>,
140        /// sequence number
141        pub sequence_number: i64,
142        /// equality ids
143        pub equality_ids: Option<Vec<i32>>,
144
145        pub file_size_in_bytes: u64,
146
147        #[serde(default = "default_case_sensitive")]
148        pub case_sensitive: bool,
149    }
150
151    impl PersistedFileScanTask {
152        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
153        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
154            let persisted_task: Self =
155                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
156                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
157            Ok(Self::to_task(persisted_task))
158        }
159
160        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
161        pub fn encode(task: FileScanTask) -> JsonbVal {
162            let persisted_task = Self::from_task(task);
163            serde_json::to_value(persisted_task).unwrap().into()
164        }
165
166        /// Converts a persisted task to a [`FileScanTask`].
167        fn to_task(
168            Self {
169                start,
170                length,
171                record_count,
172                data_file_path,
173                referenced_data_file,
174                data_file_content,
175                data_file_format,
176                schema,
177                project_field_ids,
178                predicate,
179                deletes,
180                sequence_number,
181                equality_ids,
182                file_size_in_bytes,
183                case_sensitive,
184            }: Self,
185        ) -> FileScanTask {
186            FileScanTask {
187                start,
188                length,
189                record_count,
190                data_file_path,
191                referenced_data_file,
192                data_file_content,
193                data_file_format,
194                schema,
195                project_field_ids,
196                predicate,
197                deletes: deletes
198                    .into_iter()
199                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
200                    .collect(),
201                sequence_number,
202                equality_ids,
203                file_size_in_bytes,
204                partition: None,
205                partition_spec: None,
206                name_mapping: None,
207                case_sensitive,
208            }
209        }
210
211        /// Changes a [`FileScanTask`] to a persisted one.
212        fn from_task(
213            FileScanTask {
214                start,
215                length,
216                record_count,
217                data_file_path,
218                referenced_data_file,
219                data_file_content,
220                data_file_format,
221                schema,
222                project_field_ids,
223                predicate,
224                deletes,
225                sequence_number,
226                equality_ids,
227                file_size_in_bytes,
228                case_sensitive,
229                ..
230            }: FileScanTask,
231        ) -> Self {
232            Self {
233                start,
234                length,
235                record_count,
236                data_file_path,
237                referenced_data_file,
238                data_file_content,
239                data_file_format,
240                schema,
241                project_field_ids,
242                predicate,
243                deletes: deletes
244                    .into_iter()
245                    .map(PersistedFileScanTask::from_task_ref)
246                    .collect(),
247                sequence_number,
248                equality_ids,
249                file_size_in_bytes,
250                case_sensitive,
251            }
252        }
253
254        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
255            Self {
256                start: task.start,
257                length: task.length,
258                record_count: task.record_count,
259                data_file_path: task.data_file_path.clone(),
260                referenced_data_file: task.referenced_data_file.clone(),
261                data_file_content: task.data_file_content,
262                data_file_format: task.data_file_format,
263                schema: task.schema.clone(),
264                project_field_ids: task.project_field_ids.clone(),
265                predicate: task.predicate.clone(),
266                deletes: task
267                    .deletes
268                    .iter()
269                    .cloned()
270                    .map(PersistedFileScanTask::from_task_ref)
271                    .collect(),
272                sequence_number: task.sequence_number,
273                equality_ids: task.equality_ids.clone(),
274                file_size_in_bytes: task.file_size_in_bytes,
275                case_sensitive: task.case_sensitive,
276            }
277        }
278    }
279}
280
281impl<S: StateStore> IcebergFetchExecutor<S> {
282    pub fn new(
283        actor_ctx: ActorContextRef,
284        stream_source_core: StreamSourceCore<S>,
285        upstream: Executor,
286        rate_limit_rps: Option<u32>,
287        streaming_config: Arc<StreamingConfig>,
288    ) -> Self {
289        Self {
290            actor_ctx,
291            stream_source_core: Some(stream_source_core),
292            upstream: Some(upstream),
293            rate_limit_rps,
294            streaming_config,
295        }
296    }
297
298    #[expect(clippy::too_many_arguments)]
299    async fn replace_with_new_batch_reader<const BIASED: bool>(
300        splits_on_fetch: &mut usize,
301        state_store_handler: &SourceStateTableHandler<S>,
302        column_ids: Vec<ColumnId>,
303        source_ctx: SourceContext,
304        source_desc: SourceDesc,
305        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
306        rate_limit_rps: Option<u32>,
307        streaming_config: Arc<StreamingConfig>,
308    ) -> StreamExecutorResult<()> {
309        let mut batch =
310            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
311        let state_table = state_store_handler.state_table();
312        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
313            let table_iter = state_table
314                .iter_with_vnode(
315                    vnode,
316                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
317                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
318                    PrefetchOptions::prefetch_for_small_range_scan(),
319                )
320                .await?;
321            pin_mut!(table_iter);
322            while let Some(item) = table_iter.next().await {
323                let row = item?;
324                let task = match row.datum_at(1) {
325                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
326                        PersistedFileScanTask::decode(jsonb_ref)?
327                    }
328                    _ => unreachable!(),
329                };
330                batch.push(task);
331
332                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
333                    break 'vnodes;
334                }
335            }
336        }
337        if batch.is_empty() {
338            stream.replace_data_stream(stream::pending().boxed());
339        } else {
340            *splits_on_fetch += batch.len();
341            let batch_reader = Self::build_batched_stream_reader(
342                column_ids,
343                source_ctx,
344                source_desc,
345                batch,
346                rate_limit_rps,
347                streaming_config,
348            )
349            .map_err(StreamExecutorError::connector_error);
350            stream.replace_data_stream(batch_reader);
351        }
352
353        Ok(())
354    }
355
356    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
357    async fn build_batched_stream_reader(
358        _column_ids: Vec<ColumnId>,
359        _source_ctx: SourceContext,
360        source_desc: SourceDesc,
361        batch: Vec<FileScanTask>,
362        _rate_limit_rps: Option<u32>,
363        streaming_config: Arc<StreamingConfig>,
364    ) {
365        let file_path_idx = source_desc
366            .columns
367            .iter()
368            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
369            .unwrap();
370        let file_pos_idx = source_desc
371            .columns
372            .iter()
373            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
374            .unwrap();
375        let properties = source_desc.source.config.clone();
376        let properties = match properties {
377            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
378                iceberg_properties
379            }
380            _ => unreachable!(),
381        };
382        let table = properties.load_table().await?;
383        let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
384
385        for task in batch {
386            // Capture the file path upfront from the task so we can use it even when the
387            // scan produces no chunks (empty data file or fully equality-deleted file).
388            let task_data_file_path = task.data_file_path.clone();
389            let mut chunks = vec![];
390            #[for_await]
391            for chunk in scan_task_to_chunk_with_deletes(
392                table.clone(),
393                task,
394                IcebergScanOpts {
395                    chunk_size: streaming_config.developer.chunk_size,
396                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
397                    need_file_path_and_pos: true,
398                    handle_delete_files: table.metadata().format_version()
399                        >= iceberg::spec::FormatVersion::V3,
400                },
401                Some(metrics.clone()),
402            ) {
403                let chunk = chunk?;
404                // Skip zero-cardinality chunks: a RecordBatch with 0 visible rows after
405                // predicate/delete filtering would cause `cardinality() - 1` to underflow
406                // below when we extract the last-row metadata. Skipping here is safe
407                // because the existing `task_data_file_path` fallback already covers
408                // the case where no readable rows are produced.
409                if chunk.cardinality() == 0 {
410                    continue;
411                }
412                chunks.push(StreamChunk::from_parts(
413                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
414                    chunk,
415                ));
416            }
417            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
418            // We must always yield — even for an empty task — so that `into_stream` can
419            // decrement `splits_on_fetch` and delete the file assignment from the state
420            // table.  Skipping the yield (e.g. with `continue`) would leave the file
421            // stuck in the state table and prevent subsequent batches from progressing.
422            let (data_file_path, last_read_pos) = if let Some(last_chunk) = chunks.last() {
423                let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
424                let path = last_row
425                    .datum_at(file_path_idx)
426                    .unwrap()
427                    .into_utf8()
428                    .to_owned();
429                let pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
430                (path, pos)
431            } else {
432                // No rows were produced: fall back to the task's own path so the
433                // consumer can still remove the state entry for this file.
434                (task_data_file_path, None)
435            };
436            yield ChunksWithState {
437                chunks,
438                data_file_path,
439                last_read_pos,
440            };
441        }
442    }
443
444    fn build_source_ctx(
445        &self,
446        source_desc: &SourceDesc,
447        source_id: SourceId,
448        source_name: &str,
449    ) -> SourceContext {
450        SourceContext::new(
451            self.actor_ctx.id,
452            source_id,
453            self.actor_ctx.fragment_id,
454            source_name.to_owned(),
455            source_desc.metrics.clone(),
456            SourceCtrlOpts {
457                chunk_size: limited_chunk_size(self.rate_limit_rps),
458                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
459            },
460            source_desc.source.config.clone(),
461            None,
462        )
463    }
464
465    #[try_stream(ok = Message, error = StreamExecutorError)]
466    async fn into_stream(mut self) {
467        let mut upstream = self.upstream.take().unwrap().execute();
468        let barrier = expect_first_barrier(&mut upstream).await?;
469        let first_epoch = barrier.epoch;
470        let is_pause_on_startup = barrier.is_pause_on_startup();
471        yield Message::Barrier(barrier);
472
473        let mut core = self.stream_source_core.take().unwrap();
474        let mut state_store_handler = core.split_state_store;
475
476        // Build source description from the builder.
477        let source_desc_builder = core.source_desc_builder.take().unwrap();
478
479        let source_desc = source_desc_builder
480            .build()
481            .map_err(StreamExecutorError::connector_error)?;
482
483        let file_path_idx = source_desc
484            .columns
485            .iter()
486            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
487            .unwrap();
488        let file_pos_idx = source_desc
489            .columns
490            .iter()
491            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
492            .unwrap();
493        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
494        let row_id_idx = source_desc
495            .columns
496            .iter()
497            .position(|c| c.name == ROW_ID_COLUMN_NAME)
498            .unwrap();
499        tracing::trace!(
500            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
501            source_desc.columns,
502            file_path_idx,
503            file_pos_idx,
504            row_id_idx
505        );
506        // Initialize state table.
507        state_store_handler.init_epoch(first_epoch).await?;
508
509        // Extract table name from iceberg properties for metrics labeling.
510        let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
511        let iceberg_table_name = {
512            match &source_desc.source.config {
513                risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
514                    props.table.table_name().to_owned()
515                }
516                _ => unreachable!("IcebergFetchExecutor must be built with Iceberg properties"),
517            }
518        };
519        let source_id_str = core.source_id.to_string();
520        let source_name_str = core.source_name.clone();
521        let metrics_labels = [
522            source_id_str.as_str(),
523            source_name_str.as_str(),
524            iceberg_table_name.as_str(),
525        ];
526
527        let mut splits_on_fetch: usize = 0;
528        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
529            upstream,
530            stream::pending().boxed(),
531        );
532
533        if is_pause_on_startup {
534            stream.pause_stream();
535        }
536
537        // If it is a recovery startup,
538        // there can be file assignments in the state table.
539        // Hence we try building a reader first.
540        Self::replace_with_new_batch_reader(
541            &mut splits_on_fetch,
542            &state_store_handler, // move into the function
543            core.column_ids.clone(),
544            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
545            source_desc.clone(),
546            &mut stream,
547            self.rate_limit_rps,
548            self.streaming_config.clone(),
549        )
550        .await?;
551        iceberg_metrics
552            .iceberg_source_inflight_file_count
553            .with_guarded_label_values(&metrics_labels)
554            .set(splits_on_fetch as i64);
555
556        while let Some(msg) = stream.next().await {
557            match msg {
558                Err(e) => {
559                    tracing::error!(error = %e.as_report(), "Fetch Error");
560                    iceberg_metrics
561                        .iceberg_source_scan_errors_total
562                        .with_guarded_label_values(&[
563                            metrics_labels[0],
564                            metrics_labels[1],
565                            metrics_labels[2],
566                            "fetch_error",
567                        ])
568                        .inc();
569                    splits_on_fetch = 0;
570                    iceberg_metrics
571                        .iceberg_source_inflight_file_count
572                        .with_guarded_label_values(&metrics_labels)
573                        .set(0);
574                }
575                Ok(msg) => {
576                    match msg {
577                        // This branch will be preferred.
578                        Either::Left(msg) => {
579                            match msg {
580                                Message::Barrier(barrier) => {
581                                    let mut need_rebuild_reader = false;
582
583                                    if let Some(mutation) = barrier.mutation.as_deref() {
584                                        match mutation {
585                                            Mutation::Pause => stream.pause_stream(),
586                                            Mutation::Resume => stream.resume_stream(),
587                                            Mutation::Throttle(fragment_to_apply) => {
588                                                if let Some(entry) = fragment_to_apply
589                                                    .get(&self.actor_ctx.fragment_id)
590                                                    && entry.throttle_type() == ThrottleType::Source
591                                                    && entry.rate_limit != self.rate_limit_rps
592                                                {
593                                                    tracing::debug!(
594                                                        "updating rate limit from {:?} to {:?}",
595                                                        self.rate_limit_rps,
596                                                        entry.rate_limit
597                                                    );
598                                                    self.rate_limit_rps = entry.rate_limit;
599                                                    need_rebuild_reader = true;
600                                                }
601                                            }
602                                            _ => (),
603                                        }
604                                    }
605
606                                    let post_commit = state_store_handler
607                                        .commit_may_update_vnode_bitmap(barrier.epoch)
608                                        .await?;
609
610                                    let update_vnode_bitmap =
611                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
612                                    // Propagate the barrier.
613                                    yield Message::Barrier(barrier);
614
615                                    if post_commit
616                                        .post_yield_barrier(update_vnode_bitmap)
617                                        .await?
618                                        .is_some()
619                                    {
620                                        // Vnode bitmap update changes which file assignments this executor
621                                        // should read. Rebuild the reader to avoid reading splits that no
622                                        // longer belong to this actor (e.g., during scale-out).
623                                        splits_on_fetch = 0;
624                                    }
625
626                                    if splits_on_fetch == 0 || need_rebuild_reader {
627                                        Self::replace_with_new_batch_reader(
628                                            &mut splits_on_fetch,
629                                            &state_store_handler,
630                                            core.column_ids.clone(),
631                                            self.build_source_ctx(
632                                                &source_desc,
633                                                core.source_id,
634                                                &core.source_name,
635                                            ),
636                                            source_desc.clone(),
637                                            &mut stream,
638                                            self.rate_limit_rps,
639                                            self.streaming_config.clone(),
640                                        )
641                                        .await?;
642                                        iceberg_metrics
643                                            .iceberg_source_inflight_file_count
644                                            .with_guarded_label_values(&metrics_labels)
645                                            .set(splits_on_fetch as i64);
646                                    }
647                                }
648                                // Receiving file assignments from upstream list executor,
649                                // store into state table.
650                                Message::Chunk(chunk) => {
651                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
652                                        .data_chunk()
653                                        .rows()
654                                        .map(|row| {
655                                            let file_name = row.datum_at(0).unwrap().into_utf8();
656                                            let split = row.datum_at(1).unwrap().into_jsonb();
657                                            (file_name.to_owned(), split.to_owned_scalar())
658                                        })
659                                        .collect();
660                                    state_store_handler.set_states_json(jsonb_values).await?;
661                                    state_store_handler.try_flush().await?;
662                                }
663                                Message::Watermark(_) => unreachable!(),
664                            }
665                        }
666                        // StreamChunk from FsSourceReader, and the reader reads only one file.
667                        Either::Right(ChunksWithState {
668                            chunks,
669                            data_file_path,
670                            last_read_pos: _,
671                        }) => {
672                            // TODO: support persist progress after supporting reading part of a file.
673                            if true {
674                                splits_on_fetch = splits_on_fetch.saturating_sub(1);
675                                state_store_handler.delete(&data_file_path).await?;
676                                iceberg_metrics
677                                    .iceberg_source_inflight_file_count
678                                    .with_guarded_label_values(&metrics_labels)
679                                    .set(splits_on_fetch as i64);
680                            }
681
682                            for chunk in &chunks {
683                                let chunk = prune_additional_cols(
684                                    chunk,
685                                    &[file_path_idx, file_pos_idx],
686                                    &source_desc.columns,
687                                );
688                                // pad row_id
689                                let (chunk, op) = chunk.into_parts();
690                                let (mut columns, visibility) = chunk.into_parts();
691                                columns.insert(
692                                    row_id_idx,
693                                    Arc::new(
694                                        SerialArray::from_iter_bitmap(
695                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
696                                            Bitmap::zeros(columns[0].len()),
697                                        )
698                                        .into(),
699                                    ),
700                                );
701                                let chunk = StreamChunk::from_parts(
702                                    op,
703                                    DataChunk::from_parts(columns.into(), visibility),
704                                );
705
706                                yield Message::Chunk(chunk);
707                            }
708                        }
709                    }
710                }
711            }
712        }
713    }
714}
715
716impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
717    fn execute(self: Box<Self>) -> BoxedMessageStream {
718        self.into_stream().boxed()
719    }
720}
721
722impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
723    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
724        if let Some(core) = &self.stream_source_core {
725            f.debug_struct("IcebergFetchExecutor")
726                .field("source_id", &core.source_id)
727                .field("column_ids", &core.column_ids)
728                .finish()
729        } else {
730            f.debug_struct("IcebergFetchExecutor").finish()
731        }
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use itertools::Itertools;
738    use risingwave_common::array::{DataChunk, Op, StreamChunk};
739
740    use super::ChunksWithState;
741
742    /// Verifies the `ChunksWithState` contract for the empty-task case.
743    ///
744    /// When a `FileScanTask` produces no rows (e.g. an empty data file or an Iceberg file
745    /// fully covered by equality-delete files), `build_batched_stream_reader` now yields a
746    /// `ChunksWithState` with an empty `chunks` vec and the `data_file_path` taken directly
747    /// from the task — rather than skipping the yield entirely.
748    ///
749    /// Skipping the yield would prevent `into_stream` from calling
750    /// `state_store_handler.delete(&data_file_path)` and decrementing `splits_on_fetch`,
751    /// leaving the file assignment stuck in the state table.
752    ///
753    /// This test verifies the two properties that `into_stream` depends on:
754    /// 1. An empty `ChunksWithState` forwards no data downstream (the chunk loop is a
755    ///    no-op), so no spurious rows appear.
756    /// 2. `data_file_path` is populated so the state entry can be cleaned up.
757    #[test]
758    fn test_empty_chunks_with_state_satisfies_into_stream_contract() {
759        let path = "s3://bucket/empty.parquet".to_owned();
760
761        // Simulate what build_batched_stream_reader yields for an empty task.
762        let cws = ChunksWithState {
763            chunks: vec![],
764            data_file_path: path.clone(),
765            last_read_pos: None,
766        };
767
768        // Property 1: no data is forwarded downstream.
769        let forwarded: Vec<_> = cws.chunks.iter().collect();
770        assert!(
771            forwarded.is_empty(),
772            "empty ChunksWithState must not forward any rows"
773        );
774
775        // Property 2: data_file_path is set so the state entry can be deleted.
776        assert_eq!(
777            cws.data_file_path, path,
778            "data_file_path must match the original task path"
779        );
780    }
781
782    /// Verifies that a non-empty `ChunksWithState` carries its chunks unmodified.
783    #[test]
784    fn test_non_empty_chunks_with_state() {
785        let chunk = StreamChunk::from_parts(
786            vec![Op::Insert, Op::Insert, Op::Insert],
787            DataChunk::new_dummy(3),
788        );
789        let cws = ChunksWithState {
790            chunks: vec![chunk],
791            data_file_path: "s3://bucket/data.parquet".to_owned(),
792            last_read_pos: None,
793        };
794
795        assert_eq!(cws.chunks.len(), 1);
796        assert_eq!(cws.chunks[0].cardinality(), 3);
797    }
798
799    /// Verifies that zero-cardinality chunks are excluded from the `chunks` vec.
800    ///
801    /// `scan_task_to_chunk_with_deletes` can emit zero-row `RecordBatch`es after
802    /// predicate or equality-delete filtering. If such a chunk were pushed into `chunks`
803    /// and then chosen as `chunks.last()`, the subsequent `cardinality() - 1` call would
804    /// underflow on `usize` and panic. The fix is to skip those chunks before pushing,
805    /// relying on the `task_data_file_path` fallback to cover the all-empty case.
806    #[test]
807    fn test_zero_cardinality_chunks_are_excluded() {
808        // Simulate what build_batched_stream_reader does when filtering zero-row chunks.
809        let path = "s3://bucket/mostly-deleted.parquet".to_owned();
810
811        let mut chunks: Vec<StreamChunk> = vec![];
812
813        // A zero-cardinality chunk (e.g. from a fully-deleted RecordBatch).
814        let zero_row_chunk = DataChunk::new_dummy(0);
815        if zero_row_chunk.cardinality() == 0 {
816            // skipped — no push
817        } else {
818            chunks.push(StreamChunk::from_parts(
819                itertools::repeat_n(Op::Insert, zero_row_chunk.cardinality()).collect_vec(),
820                zero_row_chunk,
821            ));
822        }
823
824        // After the loop, chunks is empty: the fallback path kicks in.
825        assert!(
826            chunks.is_empty(),
827            "zero-cardinality chunk must not be added to the chunks vec"
828        );
829
830        // Simulate the fallback: data_file_path comes from the task.
831        let cws = ChunksWithState {
832            chunks,
833            data_file_path: path.clone(),
834            last_read_pos: None,
835        };
836
837        // State cleanup can still run.
838        assert_eq!(
839            cws.data_file_path, path,
840            "data_file_path must be set even when all chunks are zero-cardinality"
841        );
842        // No spurious rows forwarded downstream.
843        assert!(cws.chunks.is_empty());
844    }
845}