risingwave_stream/executor/source/
iceberg_fetch_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use either::Either;
18use futures::{StreamExt, TryStreamExt, stream};
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use itertools::Itertools;
22use risingwave_common::array::{DataChunk, Op, SerialArray};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{
25    ColumnId, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ROW_ID_COLUMN_NAME,
26};
27use risingwave_common::config::StreamingConfig;
28use risingwave_common::hash::VnodeBitmapExt;
29use risingwave_common::id::SourceId;
30use risingwave_common::types::{JsonbVal, ScalarRef, Serial, ToOwnedDatum};
31use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
32use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
33use risingwave_connector::source::reader::desc::SourceDesc;
34use risingwave_connector::source::{SourceContext, SourceCtrlOpts};
35use risingwave_pb::common::ThrottleType;
36use risingwave_storage::store::PrefetchOptions;
37use thiserror_ext::AsReport;
38
39use super::{SourceStateTableHandler, StreamSourceCore, prune_additional_cols};
40use crate::common::rate_limit::limited_chunk_size;
41use crate::executor::prelude::*;
42use crate::executor::stream_reader::StreamReaderWithPause;
43
44/// An executor that fetches data from Iceberg tables.
45///
46/// This executor works with an upstream list executor that provides the list of files to read.
47/// It reads data from Iceberg files in batches, converts them to stream chunks, and passes them
48/// downstream.
49pub struct IcebergFetchExecutor<S: StateStore> {
50    actor_ctx: ActorContextRef,
51
52    /// Core component for managing external streaming source state
53    stream_source_core: Option<StreamSourceCore<S>>,
54
55    /// Upstream list executor that provides the list of files to read.
56    /// This executor is responsible for discovering new files and changes in the Iceberg table.
57    upstream: Option<Executor>,
58
59    /// Optional rate limit in rows/s to control data ingestion speed
60    rate_limit_rps: Option<u32>,
61
62    /// Configuration for streaming operations, including Iceberg-specific settings
63    streaming_config: Arc<StreamingConfig>,
64}
65
66/// Fetched data from 1 [`FileScanTask`], along with states for checkpointing.
67///
68/// Currently 1 `FileScanTask` -> 1 `ChunksWithState`.
69/// Later after we support reading part of a file, we will support 1 `FileScanTask` -> n `ChunksWithState`.
70pub(crate) struct ChunksWithState {
71    /// The actual data chunks read from the file
72    pub chunks: Vec<StreamChunk>,
73
74    /// Path to the data file, used for checkpointing and error reporting.
75    pub data_file_path: String,
76
77    /// The last read position in the file, used for checkpointing.
78    #[expect(dead_code)]
79    pub last_read_pos: Datum,
80}
81
82pub(super) use state::PersistedFileScanTask;
83mod state {
84    use std::sync::Arc;
85
86    use anyhow::Context;
87    use iceberg::expr::BoundPredicate;
88    use iceberg::scan::FileScanTask;
89    use iceberg::spec::{DataContentType, DataFileFormat, SchemaRef};
90    use risingwave_common::types::{JsonbRef, JsonbVal, ScalarRef};
91    use serde::{Deserialize, Serialize};
92
93    use crate::executor::StreamExecutorResult;
94
95    fn default_case_sensitive() -> bool {
96        true
97    }
98    /// This corresponds to the actually persisted `FileScanTask` in the state table.
99    ///
100    /// We introduce this in case the definition of [`FileScanTask`] changes in the iceberg-rs crate.
101    /// Currently, they have the same definition.
102    ///
103    /// We can handle possible compatibility issues in [`Self::from_task`] and [`Self::to_task`].
104    /// A version id needs to be introduced then.
105    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
106    pub struct PersistedFileScanTask {
107        /// The start offset of the file to scan.
108        pub start: u64,
109        /// The length of the file to scan.
110        pub length: u64,
111        /// The number of records in the file to scan.
112        ///
113        /// This is an optional field, and only available if we are
114        /// reading the entire data file.
115        pub record_count: Option<u64>,
116
117        /// The data file path corresponding to the task.
118        pub data_file_path: String,
119
120        /// The content type of the file to scan.
121        pub data_file_content: DataContentType,
122
123        /// The format of the file to scan.
124        pub data_file_format: DataFileFormat,
125
126        /// The schema of the file to scan.
127        pub schema: SchemaRef,
128        /// The field ids to project.
129        pub project_field_ids: Vec<i32>,
130        /// The predicate to filter.
131        #[serde(skip_serializing_if = "Option::is_none")]
132        pub predicate: Option<BoundPredicate>,
133
134        /// The list of delete files that may need to be applied to this data file
135        pub deletes: Vec<PersistedFileScanTask>,
136        /// sequence number
137        pub sequence_number: i64,
138        /// equality ids
139        pub equality_ids: Option<Vec<i32>>,
140
141        pub file_size_in_bytes: u64,
142
143        #[serde(default = "default_case_sensitive")]
144        pub case_sensitive: bool,
145    }
146
147    impl PersistedFileScanTask {
148        /// First decodes the json to the struct, then converts the struct to a [`FileScanTask`].
149        pub fn decode(jsonb_ref: JsonbRef<'_>) -> StreamExecutorResult<FileScanTask> {
150            let persisted_task: Self =
151                serde_json::from_value(jsonb_ref.to_owned_scalar().take())
152                    .with_context(|| format!("invalid state: {:?}", jsonb_ref))?;
153            Ok(Self::to_task(persisted_task))
154        }
155
156        /// First converts the [`FileScanTask`] to a persisted one, then encodes the persisted one to a jsonb value.
157        pub fn encode(task: FileScanTask) -> JsonbVal {
158            let persisted_task = Self::from_task(task);
159            serde_json::to_value(persisted_task).unwrap().into()
160        }
161
162        /// Converts a persisted task to a [`FileScanTask`].
163        fn to_task(
164            Self {
165                start,
166                length,
167                record_count,
168                data_file_path,
169                data_file_content,
170                data_file_format,
171                schema,
172                project_field_ids,
173                predicate,
174                deletes,
175                sequence_number,
176                equality_ids,
177                file_size_in_bytes,
178                case_sensitive,
179            }: Self,
180        ) -> FileScanTask {
181            FileScanTask {
182                start,
183                length,
184                record_count,
185                data_file_path,
186                data_file_content,
187                data_file_format,
188                schema,
189                project_field_ids,
190                predicate,
191                deletes: deletes
192                    .into_iter()
193                    .map(|task| Arc::new(PersistedFileScanTask::to_task(task)))
194                    .collect(),
195                sequence_number,
196                equality_ids,
197                file_size_in_bytes,
198                partition: None,
199                partition_spec: None,
200                name_mapping: None,
201                case_sensitive,
202            }
203        }
204
205        /// Changes a [`FileScanTask`] to a persisted one.
206        fn from_task(
207            FileScanTask {
208                start,
209                length,
210                record_count,
211                data_file_path,
212                data_file_content,
213                data_file_format,
214                schema,
215                project_field_ids,
216                predicate,
217                deletes,
218                sequence_number,
219                equality_ids,
220                file_size_in_bytes,
221                case_sensitive,
222                ..
223            }: FileScanTask,
224        ) -> Self {
225            Self {
226                start,
227                length,
228                record_count,
229                data_file_path,
230                data_file_content,
231                data_file_format,
232                schema,
233                project_field_ids,
234                predicate,
235                deletes: deletes
236                    .into_iter()
237                    .map(PersistedFileScanTask::from_task_ref)
238                    .collect(),
239                sequence_number,
240                equality_ids,
241                file_size_in_bytes,
242                case_sensitive,
243            }
244        }
245
246        fn from_task_ref(task: Arc<FileScanTask>) -> Self {
247            Self {
248                start: task.start,
249                length: task.length,
250                record_count: task.record_count,
251                data_file_path: task.data_file_path.clone(),
252                data_file_content: task.data_file_content,
253                data_file_format: task.data_file_format,
254                schema: task.schema.clone(),
255                project_field_ids: task.project_field_ids.clone(),
256                predicate: task.predicate.clone(),
257                deletes: task
258                    .deletes
259                    .iter()
260                    .cloned()
261                    .map(PersistedFileScanTask::from_task_ref)
262                    .collect(),
263                sequence_number: task.sequence_number,
264                equality_ids: task.equality_ids.clone(),
265                file_size_in_bytes: task.file_size_in_bytes,
266                case_sensitive: task.case_sensitive,
267            }
268        }
269    }
270}
271
272impl<S: StateStore> IcebergFetchExecutor<S> {
273    pub fn new(
274        actor_ctx: ActorContextRef,
275        stream_source_core: StreamSourceCore<S>,
276        upstream: Executor,
277        rate_limit_rps: Option<u32>,
278        streaming_config: Arc<StreamingConfig>,
279    ) -> Self {
280        Self {
281            actor_ctx,
282            stream_source_core: Some(stream_source_core),
283            upstream: Some(upstream),
284            rate_limit_rps,
285            streaming_config,
286        }
287    }
288
289    #[expect(clippy::too_many_arguments)]
290    async fn replace_with_new_batch_reader<const BIASED: bool>(
291        splits_on_fetch: &mut usize,
292        state_store_handler: &SourceStateTableHandler<S>,
293        column_ids: Vec<ColumnId>,
294        source_ctx: SourceContext,
295        source_desc: SourceDesc,
296        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
297        rate_limit_rps: Option<u32>,
298        streaming_config: Arc<StreamingConfig>,
299    ) -> StreamExecutorResult<()> {
300        let mut batch =
301            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
302        let state_table = state_store_handler.state_table();
303        'vnodes: for vnode in state_table.vnodes().iter_vnodes() {
304            let table_iter = state_table
305                .iter_with_vnode(
306                    vnode,
307                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
308                    // This usage is similar with `backfill`. So we only need to fetch a large data rather than establish a connection for a whole object.
309                    PrefetchOptions::prefetch_for_small_range_scan(),
310                )
311                .await?;
312            pin_mut!(table_iter);
313            while let Some(item) = table_iter.next().await {
314                let row = item?;
315                let task = match row.datum_at(1) {
316                    Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
317                        PersistedFileScanTask::decode(jsonb_ref)?
318                    }
319                    _ => unreachable!(),
320                };
321                batch.push(task);
322
323                if batch.len() >= streaming_config.developer.iceberg_fetch_batch_size as usize {
324                    break 'vnodes;
325                }
326            }
327        }
328        if batch.is_empty() {
329            stream.replace_data_stream(stream::pending().boxed());
330        } else {
331            *splits_on_fetch += batch.len();
332            let batch_reader = Self::build_batched_stream_reader(
333                column_ids,
334                source_ctx,
335                source_desc,
336                batch,
337                rate_limit_rps,
338                streaming_config,
339            )
340            .map_err(StreamExecutorError::connector_error);
341            stream.replace_data_stream(batch_reader);
342        }
343
344        Ok(())
345    }
346
347    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
348    async fn build_batched_stream_reader(
349        _column_ids: Vec<ColumnId>,
350        _source_ctx: SourceContext,
351        source_desc: SourceDesc,
352        batch: Vec<FileScanTask>,
353        _rate_limit_rps: Option<u32>,
354        streaming_config: Arc<StreamingConfig>,
355    ) {
356        let file_path_idx = source_desc
357            .columns
358            .iter()
359            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
360            .unwrap();
361        let file_pos_idx = source_desc
362            .columns
363            .iter()
364            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
365            .unwrap();
366        let properties = source_desc.source.config.clone();
367        let properties = match properties {
368            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
369                iceberg_properties
370            }
371            _ => unreachable!(),
372        };
373        let table = properties.load_table().await?;
374        let metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
375
376        for task in batch {
377            let mut chunks = vec![];
378            #[for_await]
379            for chunk in scan_task_to_chunk_with_deletes(
380                table.clone(),
381                task,
382                IcebergScanOpts {
383                    chunk_size: streaming_config.developer.chunk_size,
384                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
385                    need_file_path_and_pos: true,
386                    handle_delete_files: table.metadata().format_version()
387                        >= iceberg::spec::FormatVersion::V3,
388                },
389                Some(metrics.clone()),
390            ) {
391                let chunk = chunk?;
392                chunks.push(StreamChunk::from_parts(
393                    itertools::repeat_n(Op::Insert, chunk.cardinality()).collect_vec(),
394                    chunk,
395                ));
396            }
397            // We yield once for each file now, because iceberg-rs doesn't support read part of a file now.
398            let last_chunk = chunks.last().unwrap();
399            let last_row = last_chunk.row_at(last_chunk.cardinality() - 1).1;
400            let data_file_path = last_row
401                .datum_at(file_path_idx)
402                .unwrap()
403                .into_utf8()
404                .to_owned();
405            let last_read_pos = last_row.datum_at(file_pos_idx).unwrap().to_owned_datum();
406            yield ChunksWithState {
407                chunks,
408                data_file_path,
409                last_read_pos,
410            };
411        }
412    }
413
414    fn build_source_ctx(
415        &self,
416        source_desc: &SourceDesc,
417        source_id: SourceId,
418        source_name: &str,
419    ) -> SourceContext {
420        SourceContext::new(
421            self.actor_ctx.id,
422            source_id,
423            self.actor_ctx.fragment_id,
424            source_name.to_owned(),
425            source_desc.metrics.clone(),
426            SourceCtrlOpts {
427                chunk_size: limited_chunk_size(self.rate_limit_rps),
428                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
429            },
430            source_desc.source.config.clone(),
431            None,
432        )
433    }
434
435    #[try_stream(ok = Message, error = StreamExecutorError)]
436    async fn into_stream(mut self) {
437        let mut upstream = self.upstream.take().unwrap().execute();
438        let barrier = expect_first_barrier(&mut upstream).await?;
439        let first_epoch = barrier.epoch;
440        let is_pause_on_startup = barrier.is_pause_on_startup();
441        yield Message::Barrier(barrier);
442
443        let mut core = self.stream_source_core.take().unwrap();
444        let mut state_store_handler = core.split_state_store;
445
446        // Build source description from the builder.
447        let source_desc_builder = core.source_desc_builder.take().unwrap();
448
449        let source_desc = source_desc_builder
450            .build()
451            .map_err(StreamExecutorError::connector_error)?;
452
453        let file_path_idx = source_desc
454            .columns
455            .iter()
456            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
457            .unwrap();
458        let file_pos_idx = source_desc
459            .columns
460            .iter()
461            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
462            .unwrap();
463        // TODO: currently we generate row_id here. If for risingwave iceberg table engine, maybe we can use _risingwave_iceberg_row_id instead.
464        let row_id_idx = source_desc
465            .columns
466            .iter()
467            .position(|c| c.name == ROW_ID_COLUMN_NAME)
468            .unwrap();
469        tracing::trace!(
470            "source_desc.columns: {:#?}, file_path_idx: {}, file_pos_idx: {}, row_id_idx: {}",
471            source_desc.columns,
472            file_path_idx,
473            file_pos_idx,
474            row_id_idx
475        );
476        // Initialize state table.
477        state_store_handler.init_epoch(first_epoch).await?;
478
479        // Extract table name from iceberg properties for metrics labeling.
480        let iceberg_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
481        let iceberg_table_name = {
482            match &source_desc.source.config {
483                risingwave_connector::source::ConnectorProperties::Iceberg(props) => {
484                    props.table.table_name().to_owned()
485                }
486                _ => unreachable!("IcebergFetchExecutor must be built with Iceberg properties"),
487            }
488        };
489        let source_id_str = core.source_id.to_string();
490        let source_name_str = core.source_name.clone();
491        let metrics_labels = [
492            source_id_str.as_str(),
493            source_name_str.as_str(),
494            iceberg_table_name.as_str(),
495        ];
496
497        let mut splits_on_fetch: usize = 0;
498        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
499            upstream,
500            stream::pending().boxed(),
501        );
502
503        if is_pause_on_startup {
504            stream.pause_stream();
505        }
506
507        // If it is a recovery startup,
508        // there can be file assignments in the state table.
509        // Hence we try building a reader first.
510        Self::replace_with_new_batch_reader(
511            &mut splits_on_fetch,
512            &state_store_handler, // move into the function
513            core.column_ids.clone(),
514            self.build_source_ctx(&source_desc, core.source_id, &core.source_name),
515            source_desc.clone(),
516            &mut stream,
517            self.rate_limit_rps,
518            self.streaming_config.clone(),
519        )
520        .await?;
521        iceberg_metrics
522            .iceberg_source_inflight_file_count
523            .with_guarded_label_values(&metrics_labels)
524            .set(splits_on_fetch as i64);
525
526        while let Some(msg) = stream.next().await {
527            match msg {
528                Err(e) => {
529                    tracing::error!(error = %e.as_report(), "Fetch Error");
530                    iceberg_metrics
531                        .iceberg_source_scan_errors_total
532                        .with_guarded_label_values(&[
533                            metrics_labels[0],
534                            metrics_labels[1],
535                            metrics_labels[2],
536                            "fetch_error",
537                        ])
538                        .inc();
539                    splits_on_fetch = 0;
540                    iceberg_metrics
541                        .iceberg_source_inflight_file_count
542                        .with_guarded_label_values(&metrics_labels)
543                        .set(0);
544                }
545                Ok(msg) => {
546                    match msg {
547                        // This branch will be preferred.
548                        Either::Left(msg) => {
549                            match msg {
550                                Message::Barrier(barrier) => {
551                                    let mut need_rebuild_reader = false;
552
553                                    if let Some(mutation) = barrier.mutation.as_deref() {
554                                        match mutation {
555                                            Mutation::Pause => stream.pause_stream(),
556                                            Mutation::Resume => stream.resume_stream(),
557                                            Mutation::Throttle(fragment_to_apply) => {
558                                                if let Some(entry) = fragment_to_apply
559                                                    .get(&self.actor_ctx.fragment_id)
560                                                    && entry.throttle_type() == ThrottleType::Source
561                                                    && entry.rate_limit != self.rate_limit_rps
562                                                {
563                                                    tracing::debug!(
564                                                        "updating rate limit from {:?} to {:?}",
565                                                        self.rate_limit_rps,
566                                                        entry.rate_limit
567                                                    );
568                                                    self.rate_limit_rps = entry.rate_limit;
569                                                    need_rebuild_reader = true;
570                                                }
571                                            }
572                                            _ => (),
573                                        }
574                                    }
575
576                                    let post_commit = state_store_handler
577                                        .commit_may_update_vnode_bitmap(barrier.epoch)
578                                        .await?;
579
580                                    let update_vnode_bitmap =
581                                        barrier.as_update_vnode_bitmap(self.actor_ctx.id);
582                                    // Propagate the barrier.
583                                    yield Message::Barrier(barrier);
584
585                                    if post_commit
586                                        .post_yield_barrier(update_vnode_bitmap)
587                                        .await?
588                                        .is_some()
589                                    {
590                                        // Vnode bitmap update changes which file assignments this executor
591                                        // should read. Rebuild the reader to avoid reading splits that no
592                                        // longer belong to this actor (e.g., during scale-out).
593                                        splits_on_fetch = 0;
594                                    }
595
596                                    if splits_on_fetch == 0 || need_rebuild_reader {
597                                        Self::replace_with_new_batch_reader(
598                                            &mut splits_on_fetch,
599                                            &state_store_handler,
600                                            core.column_ids.clone(),
601                                            self.build_source_ctx(
602                                                &source_desc,
603                                                core.source_id,
604                                                &core.source_name,
605                                            ),
606                                            source_desc.clone(),
607                                            &mut stream,
608                                            self.rate_limit_rps,
609                                            self.streaming_config.clone(),
610                                        )
611                                        .await?;
612                                        iceberg_metrics
613                                            .iceberg_source_inflight_file_count
614                                            .with_guarded_label_values(&metrics_labels)
615                                            .set(splits_on_fetch as i64);
616                                    }
617                                }
618                                // Receiving file assignments from upstream list executor,
619                                // store into state table.
620                                Message::Chunk(chunk) => {
621                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
622                                        .data_chunk()
623                                        .rows()
624                                        .map(|row| {
625                                            let file_name = row.datum_at(0).unwrap().into_utf8();
626                                            let split = row.datum_at(1).unwrap().into_jsonb();
627                                            (file_name.to_owned(), split.to_owned_scalar())
628                                        })
629                                        .collect();
630                                    state_store_handler.set_states_json(jsonb_values).await?;
631                                    state_store_handler.try_flush().await?;
632                                }
633                                Message::Watermark(_) => unreachable!(),
634                            }
635                        }
636                        // StreamChunk from FsSourceReader, and the reader reads only one file.
637                        Either::Right(ChunksWithState {
638                            chunks,
639                            data_file_path,
640                            last_read_pos: _,
641                        }) => {
642                            // TODO: support persist progress after supporting reading part of a file.
643                            if true {
644                                splits_on_fetch = splits_on_fetch.saturating_sub(1);
645                                state_store_handler.delete(&data_file_path).await?;
646                                iceberg_metrics
647                                    .iceberg_source_inflight_file_count
648                                    .with_guarded_label_values(&metrics_labels)
649                                    .set(splits_on_fetch as i64);
650                            }
651
652                            for chunk in &chunks {
653                                let chunk = prune_additional_cols(
654                                    chunk,
655                                    &[file_path_idx, file_pos_idx],
656                                    &source_desc.columns,
657                                );
658                                // pad row_id
659                                let (chunk, op) = chunk.into_parts();
660                                let (mut columns, visibility) = chunk.into_parts();
661                                columns.insert(
662                                    row_id_idx,
663                                    Arc::new(
664                                        SerialArray::from_iter_bitmap(
665                                            itertools::repeat_n(Serial::from(0), columns[0].len()),
666                                            Bitmap::zeros(columns[0].len()),
667                                        )
668                                        .into(),
669                                    ),
670                                );
671                                let chunk = StreamChunk::from_parts(
672                                    op,
673                                    DataChunk::from_parts(columns.into(), visibility),
674                                );
675
676                                yield Message::Chunk(chunk);
677                            }
678                        }
679                    }
680                }
681            }
682        }
683    }
684}
685
686impl<S: StateStore> Execute for IcebergFetchExecutor<S> {
687    fn execute(self: Box<Self>) -> BoxedMessageStream {
688        self.into_stream().boxed()
689    }
690}
691
692impl<S: StateStore> Debug for IcebergFetchExecutor<S> {
693    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
694        if let Some(core) = &self.stream_source_core {
695            f.debug_struct("IcebergFetchExecutor")
696                .field("source_id", &core.source_id)
697                .field("column_ids", &core.column_ids)
698                .finish()
699        } else {
700            f.debug_struct("IcebergFetchExecutor").finish()
701        }
702    }
703}