risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
32use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
33use risingwave_connector::source::reader::desc::SourceDesc;
34use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
40use crate::common::rate_limit::limited_chunk_size;
41use crate::executor::prelude::*;
42use crate::executor::stream_reader::StreamReaderWithPause;
43
44/// An executor that fetches data from Iceberg tables.
45///
46/// This executor works with an upstream list executor that provides the list of files to read.
47/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
48/// downstream.
49pub struct IcebergFetchExecutor<S: StateStore> {
50    actor_ctx: ActorContextRef,
51
52    /// Core component for managing external streaming source state
53    stream_source_core: Option<StreamSourceCore<S>>,
54
55    /// Upstream list executor that provides the list of files to read.
56    /// This executor is responsible for discovering new files and changes in the Iceberg table.
57    upstream: Option<Executor>,
58
59    /// Optional rate limit in rows/s to control data ingestion speed
60    rate_limit_rps: Option<u32>,
61
62    /// Configuration for streaming operations, including Iceberg-specific settings
63    streaming_config: Arc<StreamingConfig>,
64}
65
66/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
67///
68/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
69/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
70pub(crate) struct ChunksWithState {
71    /// The actual data chunks read from the file
72    pub chunks: Vec<StreamChunk>,
73
74    /// Path to the data file, used for checkpointing and error reporting.
75    pub data_file_path: String,
76
77    /// The last read position in the file, used for checkpointing.
78    #[expect(dead_code)]
79    pub last_read_pos: Datum,
80}
81
82pub(super) use state::PersistedFileScanTask;
83mod state {
84    use std::sync::Arc;
85
86    use anyhow::Context;
87    use iceberg::expr::BoundPredicate;
88    use iceberg::scan::FileScanTask;
89    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
90    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
91    use serde::{Deserialize, Serialize};
92
93    use crate::executor::StreamExecutorResult;
94
95    fn default_case_sensitive() -> bool {
96        true
97    }
98    /// This corresponds to the actually persisted `FileScanTask` in the state table.
99    ///
100    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
101    /// Currently, they have the same definition.
102    ///
103    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
104    /// A version id needs to be introduced then.
105    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106    pub struct PersistedFileScanTask {
107        /// The start offset of the file to scan.
108        pub start: u64,
109        /// The length of the file to scan.
110        pub length: u64,
111        /// The number of records in the file to scan.
112        ///
113        /// This is an optional field, and only available if we are
114        /// reading the entire data file.
115        pub record_count: Option<u64>,
116
117        /// The data file path corresponding to the task.
118        pub data_file_path: String,
119
120        /// The content type of the file to scan.
121        pub data_file_content: DataContentType,
122
123        /// The format of the file to scan.
124        pub data_file_format: DataFileFormat,
125
126        /// The schema of the file to scan.
127        pub schema: SchemaRef,
128        /// The field ids to project.
129        pub project_field_ids: Vec<i32>,
130        /// The predicate to filter.
131        #[serde(skip_serializing_if = "Option::is_none")]
132        pub predicate: Option<BoundPredicate>,
133
134        /// The list of delete files that may need to be applied to this data file
135        pub deletes: Vec<PersistedFileScanTask>,
136        /// sequence number
137        pub sequence_number: i64,
138        /// equality ids
139        pub equality_ids: Option<Vec<i32>>,
140
141        pub file_size_in_bytes: u64,
142
143        #[serde(default = "default_case_sensitive")]
144        pub case_sensitive: bool,
145    }
146
147    impl PersistedFileScanTask {
148        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
149        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
150            let persisted_task: Self =
151                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
152                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
153            Ok(Self::to_task(persisted_task))
154        }
155
156        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
157        pub fn encode(task: FileScanTask) -> JsonbVal {
158            let persisted_task = Self::from_task(task);
159            serde_json::to_value(persisted_task).unwrap().into()
160        }
161
162        /// Converts a persisted task to a [`FileScanTask`].
163        fn to_task(
164            Self {
165                start,
166                length,
167                record_count,
168                data_file_path,
169                data_file_content,
170                data_file_format,
171                schema,
172                project_field_ids,
173                predicate,
174                deletes,
175                sequence_number,
176                equality_ids,
177                file_size_in_bytes,
178                case_sensitive,
179            }: Self,
180        ) -> FileScanTask {
181            FileScanTask {
182                start,
183                length,
184                record_count,
185                data_file_path,
186                data_file_content,
187                data_file_format,
188                schema,
189                project_field_ids,
190                predicate,
191                deletes: deletes
192                    .into_iter()
193                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
194                    .collect(),
195                sequence_number,
196                equality_ids,
197                file_size_in_bytes,
198                partition: None,
199                partition_spec: None,
200                name_mapping: None,
201                case_sensitive,
202            }
203        }
204
205        /// Changes a [`FileScanTask`] to a persisted one.
206        fn from_task(
207            FileScanTask {
208                start,
209                length,
210                record_count,
211                data_file_path,
212                data_file_content,
213                data_file_format,
214                schema,
215                project_field_ids,
216                predicate,
217                deletes,
218                sequence_number,
219                equality_ids,
220                file_size_in_bytes,
221                case_sensitive,
222                ..
223            }: FileScanTask,
224        ) -> Self {
225            Self {
226                start,
227                length,
228                record_count,
229                data_file_path,
230                data_file_content,
231                data_file_format,
232                schema,
233                project_field_ids,
234                predicate,
235                deletes: deletes
236                    .into_iter()
237                    .map(PersistedFileScanTask::from_task_ref)
238                    .collect(),
239                sequence_number,
240                equality_ids,
241                file_size_in_bytes,
242                case_sensitive,
243            }
244        }
245
246        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
247            Self {
248                start: task.start,
249                length: task.length,
250                record_count: task.record_count,
251                data_file_path: task.data_file_path.clone(),
252                data_file_content: task.data_file_content,
253                data_file_format: task.data_file_format,
254                schema: task.schema.clone(),
255                project_field_ids: task.project_field_ids.clone(),
256                predicate: task.predicate.clone(),
257                deletes: task
258                    .deletes
259                    .iter()
260                    .cloned()
261                    .map(PersistedFileScanTask::from_task_ref)
262                    .collect(),
263                sequence_number: task.sequence_number,
264                equality_ids: task.equality_ids.clone(),
265                file_size_in_bytes: task.file_size_in_bytes,
266                case_sensitive: task.case_sensitive,
267            }
268        }
269    }
270}
271
272impl<S: StateStore> IcebergFetchExecutor<S> {
273    pub fn new(
274        actor_ctx: ActorContextRef,
275        stream_source_core: StreamSourceCore<S>,
276        upstream: Executor,
277        rate_limit_rps: Option<u32>,
278        streaming_config: Arc<StreamingConfig>,
279    ) -> Self {
280        Self {
281            actor_ctx,
282            stream_source_core: Some(stream_source_core),
283            upstream: Some(upstream),
284            rate_limit_rps,
285            streaming_config,
286        }
287    }
288
289    #[expect(clippy::too_many_arguments)]
290    async fn replace_with_new_batch_reader<const BIASED: bool>(
291        splits_on_fetch: &mut usize,
292        state_store_handler: &SourceStateTableHandler<S>,
293        column_ids: Vec<ColumnId>,
294        source_ctx: SourceContext,
295        source_desc: SourceDesc,
296        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
297        rate_limit_rps: Option<u32>,
298        streaming_config: Arc<StreamingConfig>,
299    ) -> StreamExecutorResult<()> {
300        let mut batch =
301            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
302        let state_table = state_store_handler.state_table();
303        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
304            let table_iter = state_table
305                .iter_with_vnode(
306                    vnode,
307                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
308                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
309                    PrefetchOptions::prefetch_for_small_range_scan(),
310                )
311                .await?;
312            pin_mut!(table_iter);
313            while let Some(item) = table_iter.next().await {
314                let row = item?;
315                let task = match row.datum_at(1) {
316                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
317                        PersistedFileScanTask::decode(jsonb_ref)?
318                    }
319                    _ => unreachable!(),
320                };
321                batch.push(task);
322
323                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
324                    break 'vnodes;
325                }
326            }
327        }
328        if batch.is_empty() {
329            stream.replace_data_stream(stream::pending().boxed());
330        } else {
331            *splits_on_fetch += batch.len();
332            let batch_reader = Self::build_batched_stream_reader(
333                column_ids,
334                source_ctx,
335                source_desc,
336                batch,
337                rate_limit_rps,
338                streaming_config,
339            )
340            .map_err(StreamExecutorError::connector_error);
341            stream.replace_data_stream(batch_reader);
342        }
343
344        Ok(())
345    }
346
347    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
348    async fn build_batched_stream_reader(
349        _column_ids: Vec<ColumnId>,
350        _source_ctx: SourceContext,
351        source_desc: SourceDesc,
352        batch: Vec<FileScanTask>,
353        _rate_limit_rps: Option<u32>,
354        streaming_config: Arc<StreamingConfig>,
355    ) {
356        let file_path_idx = source_desc
357            .columns
358            .iter()
359            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
360            .unwrap();
361        let file_pos_idx = source_desc
362            .columns
363            .iter()
364            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
365            .unwrap();
366        let properties = source_desc.source.config.clone();
367        let properties = match properties {
368            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
369                iceberg_properties
370            }
371            _ => unreachable!(),
372        };
373        let table = properties.load_table().await?;
374        let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
375
376        for task in batch {
377            // Capture the file path upfront from the task so we can use it even when the
378            // scan produces no chunks (empty data file or fully equality-deleted file).
379            let task_data_file_path = task.data_file_path.clone();
380            let mut chunks = vec![];
381            #[for_await]
382            for chunk in scan_task_to_chunk_with_deletes(
383                table.clone(),
384                task,
385                IcebergScanOpts {
386                    chunk_size: streaming_config.developer.chunk_size,
387                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
388                    need_file_path_and_pos: true,
389                    handle_delete_files: table.metadata().format_version()
390                        >= iceberg::spec::FormatVersion::V3,
391                },
392                Some(metrics.clone()),
393            ) {
394                let chunk = chunk?;
395                // Skip zero-cardinality chunks: a RecordBatch with 0 visible rows after
396                // predicate/delete filtering would cause `cardinality() - 1` to underflow
397                // below when we extract the last-row metadata. Skipping here is safe
398                // because the existing `task_data_file_path` fallback already covers
399                // the case where no readable rows are produced.
400                if chunk.cardinality() == 0 {
401                    continue;
402                }
403                chunks.push(StreamChunk::from_parts(
404                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
405                    chunk,
406                ));
407            }
408            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
409            // We must always yield — even for an empty task — so that `into_stream` can
410            // decrement `splits_on_fetch` and delete the file assignment from the state
411            // table.  Skipping the yield (e.g. with `continue`) would leave the file
412            // stuck in the state table and prevent subsequent batches from progressing.
413            let (data_file_path, last_read_pos) = if let Some(last_chunk) = chunks.last() {
414                let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
415                let path = last_row
416                    .datum_at(file_path_idx)
417                    .unwrap()
418                    .into_utf8()
419                    .to_owned();
420                let pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
421                (path, pos)
422            } else {
423                // No rows were produced: fall back to the task's own path so the
424                // consumer can still remove the state entry for this file.
425                (task_data_file_path, None)
426            };
427            yield ChunksWithState {
428                chunks,
429                data_file_path,
430                last_read_pos,
431            };
432        }
433    }
434
435    fn build_source_ctx(
436        &self,
437        source_desc: &SourceDesc,
438        source_id: SourceId,
439        source_name: &str,
440    ) -> SourceContext {
441        SourceContext::new(
442            self.actor_ctx.id,
443            source_id,
444            self.actor_ctx.fragment_id,
445            source_name.to_owned(),
446            source_desc.metrics.clone(),
447            SourceCtrlOpts {
448                chunk_size: limited_chunk_size(self.rate_limit_rps),
449                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
450            },
451            source_desc.source.config.clone(),
452            None,
453        )
454    }
455
456    #[try_stream(ok = Message, error = StreamExecutorError)]
457    async fn into_stream(mut self) {
458        let mut upstream = self.upstream.take().unwrap().execute();
459        let barrier = expect_first_barrier(&mut upstream).await?;
460        let first_epoch = barrier.epoch;
461        let is_pause_on_startup = barrier.is_pause_on_startup();
462        yield Message::Barrier(barrier);
463
464        let mut core = self.stream_source_core.take().unwrap();
465        let mut state_store_handler = core.split_state_store;
466
467        // Build source description from the builder.
468        let source_desc_builder = core.source_desc_builder.take().unwrap();
469
470        let source_desc = source_desc_builder
471            .build()
472            .map_err(StreamExecutorError::connector_error)?;
473
474        let file_path_idx = source_desc
475            .columns
476            .iter()
477            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
478            .unwrap();
479        let file_pos_idx = source_desc
480            .columns
481            .iter()
482            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
483            .unwrap();
484        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
485        let row_id_idx = source_desc
486            .columns
487            .iter()
488            .position(|c| c.name == ROW_ID_COLUMN_NAME)
489            .unwrap();
490        tracing::trace!(
491            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
492            source_desc.columns,
493            file_path_idx,
494            file_pos_idx,
495            row_id_idx
496        );
497        // Initialize state table.
498        state_store_handler.init_epoch(first_epoch).await?;
499
500        // Extract table name from iceberg properties for metrics labeling.
501        let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
502        let iceberg_table_name = {
503            match &source_desc.source.config {
504                risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
505                    props.table.table_name().to_owned()
506                }
507                _ => unreachable!("IcebergFetchExecutor must be built with Iceberg properties"),
508            }
509        };
510        let source_id_str = core.source_id.to_string();
511        let source_name_str = core.source_name.clone();
512        let metrics_labels = [
513            source_id_str.as_str(),
514            source_name_str.as_str(),
515            iceberg_table_name.as_str(),
516        ];
517
518        let mut splits_on_fetch: usize = 0;
519        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
520            upstream,
521            stream::pending().boxed(),
522        );
523
524        if is_pause_on_startup {
525            stream.pause_stream();
526        }
527
528        // If it is a recovery startup,
529        // there can be file assignments in the state table.
530        // Hence we try building a reader first.
531        Self::replace_with_new_batch_reader(
532            &mut splits_on_fetch,
533            &state_store_handler, // move into the function
534            core.column_ids.clone(),
535            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
536            source_desc.clone(),
537            &mut stream,
538            self.rate_limit_rps,
539            self.streaming_config.clone(),
540        )
541        .await?;
542        iceberg_metrics
543            .iceberg_source_inflight_file_count
544            .with_guarded_label_values(&metrics_labels)
545            .set(splits_on_fetch as i64);
546
547        while let Some(msg) = stream.next().await {
548            match msg {
549                Err(e) => {
550                    tracing::error!(error = %e.as_report(), "Fetch Error");
551                    iceberg_metrics
552                        .iceberg_source_scan_errors_total
553                        .with_guarded_label_values(&[
554                            metrics_labels[0],
555                            metrics_labels[1],
556                            metrics_labels[2],
557                            "fetch_error",
558                        ])
559                        .inc();
560                    splits_on_fetch = 0;
561                    iceberg_metrics
562                        .iceberg_source_inflight_file_count
563                        .with_guarded_label_values(&metrics_labels)
564                        .set(0);
565                }
566                Ok(msg) => {
567                    match msg {
568                        // This branch will be preferred.
569                        Either::Left(msg) => {
570                            match msg {
571                                Message::Barrier(barrier) => {
572                                    let mut need_rebuild_reader = false;
573
574                                    if let Some(mutation) = barrier.mutation.as_deref() {
575                                        match mutation {
576                                            Mutation::Pause => stream.pause_stream(),
577                                            Mutation::Resume => stream.resume_stream(),
578                                            Mutation::Throttle(fragment_to_apply) => {
579                                                if let Some(entry) = fragment_to_apply
580                                                    .get(&self.actor_ctx.fragment_id)
581                                                    && entry.throttle_type() == ThrottleType::Source
582                                                    && entry.rate_limit != self.rate_limit_rps
583                                                {
584                                                    tracing::debug!(
585                                                        "updating rate limit from {:?} to {:?}",
586                                                        self.rate_limit_rps,
587                                                        entry.rate_limit
588                                                    );
589                                                    self.rate_limit_rps = entry.rate_limit;
590                                                    need_rebuild_reader = true;
591                                                }
592                                            }
593                                            _ => (),
594                                        }
595                                    }
596
597                                    let post_commit = state_store_handler
598                                        .commit_may_update_vnode_bitmap(barrier.epoch)
599                                        .await?;
600
601                                    let update_vnode_bitmap =
602                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
603                                    // Propagate the barrier.
604                                    yield Message::Barrier(barrier);
605
606                                    if post_commit
607                                        .post_yield_barrier(update_vnode_bitmap)
608                                        .await?
609                                        .is_some()
610                                    {
611                                        // Vnode bitmap update changes which file assignments this executor
612                                        // should read. Rebuild the reader to avoid reading splits that no
613                                        // longer belong to this actor (e.g., during scale-out).
614                                        splits_on_fetch = 0;
615                                    }
616
617                                    if splits_on_fetch == 0 || need_rebuild_reader {
618                                        Self::replace_with_new_batch_reader(
619                                            &mut splits_on_fetch,
620                                            &state_store_handler,
621                                            core.column_ids.clone(),
622                                            self.build_source_ctx(
623                                                &source_desc,
624                                                core.source_id,
625                                                &core.source_name,
626                                            ),
627                                            source_desc.clone(),
628                                            &mut stream,
629                                            self.rate_limit_rps,
630                                            self.streaming_config.clone(),
631                                        )
632                                        .await?;
633                                        iceberg_metrics
634                                            .iceberg_source_inflight_file_count
635                                            .with_guarded_label_values(&metrics_labels)
636                                            .set(splits_on_fetch as i64);
637                                    }
638                                }
639                                // Receiving file assignments from upstream list executor,
640                                // store into state table.
641                                Message::Chunk(chunk) => {
642                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
643                                        .data_chunk()
644                                        .rows()
645                                        .map(|row| {
646                                            let file_name = row.datum_at(0).unwrap().into_utf8();
647                                            let split = row.datum_at(1).unwrap().into_jsonb();
648                                            (file_name.to_owned(), split.to_owned_scalar())
649                                        })
650                                        .collect();
651                                    state_store_handler.set_states_json(jsonb_values).await?;
652                                    state_store_handler.try_flush().await?;
653                                }
654                                Message::Watermark(_) => unreachable!(),
655                            }
656                        }
657                        // StreamChunk from FsSourceReader, and the reader reads only one file.
658                        Either::Right(ChunksWithState {
659                            chunks,
660                            data_file_path,
661                            last_read_pos: _,
662                        }) => {
663                            // TODO: support persist progress after supporting reading part of a file.
664                            if true {
665                                splits_on_fetch = splits_on_fetch.saturating_sub(1);
666                                state_store_handler.delete(&data_file_path).await?;
667                                iceberg_metrics
668                                    .iceberg_source_inflight_file_count
669                                    .with_guarded_label_values(&metrics_labels)
670                                    .set(splits_on_fetch as i64);
671                            }
672
673                            for chunk in &chunks {
674                                let chunk = prune_additional_cols(
675                                    chunk,
676                                    &[file_path_idx, file_pos_idx],
677                                    &source_desc.columns,
678                                );
679                                // pad row_id
680                                let (chunk, op) = chunk.into_parts();
681                                let (mut columns, visibility) = chunk.into_parts();
682                                columns.insert(
683                                    row_id_idx,
684                                    Arc::new(
685                                        SerialArray::from_iter_bitmap(
686                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
687                                            Bitmap::zeros(columns[0].len()),
688                                        )
689                                        .into(),
690                                    ),
691                                );
692                                let chunk = StreamChunk::from_parts(
693                                    op,
694                                    DataChunk::from_parts(columns.into(), visibility),
695                                );
696
697                                yield Message::Chunk(chunk);
698                            }
699                        }
700                    }
701                }
702            }
703        }
704    }
705}
706
707impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
708    fn execute(self: Box<Self>) -> BoxedMessageStream {
709        self.into_stream().boxed()
710    }
711}
712
713impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
714    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
715        if let Some(core) = &self.stream_source_core {
716            f.debug_struct("IcebergFetchExecutor")
717                .field("source_id", &core.source_id)
718                .field("column_ids", &core.column_ids)
719                .finish()
720        } else {
721            f.debug_struct("IcebergFetchExecutor").finish()
722        }
723    }
724}
725
726#[cfg(test)]
727mod tests {
728    use itertools::Itertools;
729    use risingwave_common::array::{DataChunk, Op, StreamChunk};
730
731    use super::ChunksWithState;
732
733    /// Verifies the `ChunksWithState` contract for the empty-task case.
734    ///
735    /// When a `FileScanTask` produces no rows (e.g. an empty data file or an Iceberg file
736    /// fully covered by equality-delete files), `build_batched_stream_reader` now yields a
737    /// `ChunksWithState` with an empty `chunks` vec and the `data_file_path` taken directly
738    /// from the task — rather than skipping the yield entirely.
739    ///
740    /// Skipping the yield would prevent `into_stream` from calling
741    /// `state_store_handler.delete(&data_file_path)` and decrementing `splits_on_fetch`,
742    /// leaving the file assignment stuck in the state table.
743    ///
744    /// This test verifies the two properties that `into_stream` depends on:
745    /// 1. An empty `ChunksWithState` forwards no data downstream (the chunk loop is a
746    ///    no-op), so no spurious rows appear.
747    /// 2. `data_file_path` is populated so the state entry can be cleaned up.
748    #[test]
749    fn test_empty_chunks_with_state_satisfies_into_stream_contract() {
750        let path = "s3://bucket/empty.parquet".to_owned();
751
752        // Simulate what build_batched_stream_reader yields for an empty task.
753        let cws = ChunksWithState {
754            chunks: vec![],
755            data_file_path: path.clone(),
756            last_read_pos: None,
757        };
758
759        // Property 1: no data is forwarded downstream.
760        let forwarded: Vec<_> = cws.chunks.iter().collect();
761        assert!(
762            forwarded.is_empty(),
763            "empty ChunksWithState must not forward any rows"
764        );
765
766        // Property 2: data_file_path is set so the state entry can be deleted.
767        assert_eq!(
768            cws.data_file_path, path,
769            "data_file_path must match the original task path"
770        );
771    }
772
773    /// Verifies that a non-empty `ChunksWithState` carries its chunks unmodified.
774    #[test]
775    fn test_non_empty_chunks_with_state() {
776        let chunk = StreamChunk::from_parts(
777            vec![Op::Insert, Op::Insert, Op::Insert],
778            DataChunk::new_dummy(3),
779        );
780        let cws = ChunksWithState {
781            chunks: vec![chunk],
782            data_file_path: "s3://bucket/data.parquet".to_owned(),
783            last_read_pos: None,
784        };
785
786        assert_eq!(cws.chunks.len(), 1);
787        assert_eq!(cws.chunks[0].cardinality(), 3);
788    }
789
790    /// Verifies that zero-cardinality chunks are excluded from the `chunks` vec.
791    ///
792    /// `scan_task_to_chunk_with_deletes` can emit zero-row `RecordBatch`es after
793    /// predicate or equality-delete filtering. If such a chunk were pushed into `chunks`
794    /// and then chosen as `chunks.last()`, the subsequent `cardinality() - 1` call would
795    /// underflow on `usize` and panic. The fix is to skip those chunks before pushing,
796    /// relying on the `task_data_file_path` fallback to cover the all-empty case.
797    #[test]
798    fn test_zero_cardinality_chunks_are_excluded() {
799        // Simulate what build_batched_stream_reader does when filtering zero-row chunks.
800        let path = "s3://bucket/mostly-deleted.parquet".to_owned();
801
802        let mut chunks: Vec<StreamChunk> = vec![];
803
804        // A zero-cardinality chunk (e.g. from a fully-deleted RecordBatch).
805        let zero_row_chunk = DataChunk::new_dummy(0);
806        if zero_row_chunk.cardinality() == 0 {
807            // skipped — no push
808        } else {
809            chunks.push(StreamChunk::from_parts(
810                itertools::repeat_n(Op::Insert, zero_row_chunk.cardinality()).collect_vec(),
811                zero_row_chunk,
812            ));
813        }
814
815        // After the loop, chunks is empty: the fallback path kicks in.
816        assert!(
817            chunks.is_empty(),
818            "zero-cardinality chunk must not be added to the chunks vec"
819        );
820
821        // Simulate the fallback: data_file_path comes from the task.
822        let cws = ChunksWithState {
823            chunks,
824            data_file_path: path.clone(),
825            last_read_pos: None,
826        };
827
828        // State cleanup can still run.
829        assert_eq!(
830            cws.data_file_path, path,
831            "data_file_path must be set even when all chunks are zero-cardinality"
832        );
833        // No spurious rows forwarded downstream.
834        assert!(cws.chunks.is_empty());
835    }
836}