risingwave_batch_executors/executor/
iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20    Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25    IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26    scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_expr::expr::LiteralExpression;
30use risingwave_pb::batch_plan::plan_node::NodeBody;
31
32use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
33use crate::ValuesExecutor;
34use crate::error::BatchError;
35use crate::executor::Executor;
36use crate::monitor::BatchMetrics;
37
38pub struct IcebergScanExecutor {
39    iceberg_config: IcebergProperties,
40    #[allow(dead_code)]
41    snapshot_id: Option<i64>,
42    file_scan_tasks: Option<IcebergFileScanTask>,
43    chunk_size: usize,
44    schema: Schema,
45    identity: String,
46    metrics: Option<BatchMetrics>,
47    need_seq_num: bool,
48    need_file_path_and_pos: bool,
49}
50
51impl Executor for IcebergScanExecutor {
52    fn schema(&self) -> &risingwave_common::catalog::Schema {
53        &self.schema
54    }
55
56    fn identity(&self) -> &str {
57        &self.identity
58    }
59
60    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
61        self.do_execute().boxed()
62    }
63}
64
65impl IcebergScanExecutor {
66    pub fn new(
67        iceberg_config: IcebergProperties,
68        snapshot_id: Option<i64>,
69        file_scan_tasks: IcebergFileScanTask,
70        chunk_size: usize,
71        schema: Schema,
72        identity: String,
73        metrics: Option<BatchMetrics>,
74        need_seq_num: bool,
75        need_file_path_and_pos: bool,
76    ) -> Self {
77        Self {
78            iceberg_config,
79            snapshot_id,
80            chunk_size,
81            schema,
82            file_scan_tasks: Some(file_scan_tasks),
83            identity,
84            metrics,
85            need_seq_num,
86            need_file_path_and_pos,
87        }
88    }
89
90    #[try_stream(ok = DataChunk, error = BatchError)]
91    async fn do_execute(mut self: Box<Self>) {
92        let table = self.iceberg_config.load_table().await?;
93        let data_types = self.schema.data_types();
94
95        let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
96            Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
97            Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
98                equality_delete_file_scan_tasks
99            }
100            Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
101                position_delete_file_scan_tasks
102            }
103            Some(IcebergFileScanTask::CountStar(_)) => {
104                bail!("iceberg scan executor does not support count star")
105            }
106            None => {
107                bail!("file_scan_tasks must be Some")
108            }
109        };
110
111        for data_file_scan_task in data_file_scan_tasks {
112            #[for_await]
113            for chunk in scan_task_to_chunk_with_deletes(
114                table.clone(),
115                data_file_scan_task,
116                IcebergScanOpts {
117                    chunk_size: self.chunk_size,
118                    need_seq_num: self.need_seq_num,
119                    need_file_path_and_pos: self.need_file_path_and_pos,
120                    handle_delete_files: false,
121                },
122                self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
123            ) {
124                let chunk = chunk?;
125                assert_eq!(chunk.data_types(), data_types);
126                yield chunk;
127            }
128        }
129    }
130}
131
132pub struct IcebergScanExecutorBuilder {}
133
134impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
135    async fn new_boxed_executor(
136        source: &ExecutorBuilder<'_>,
137        inputs: Vec<BoxedExecutor>,
138    ) -> crate::error::Result<BoxedExecutor> {
139        ensure!(
140            inputs.is_empty(),
141            "Iceberg source should not have input executor!"
142        );
143        let source_node = try_match_expand!(
144            source.plan_node().get_node_body().unwrap(),
145            NodeBody::IcebergScan
146        )?;
147
148        // prepare connector source
149        let options_with_secret = WithOptionsSecResolved::new(
150            source_node.with_properties.clone(),
151            source_node.secret_refs.clone(),
152        );
153        let config = ConnectorProperties::extract(options_with_secret, false)?;
154
155        let split_list = source_node
156            .split
157            .iter()
158            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
159            .collect_vec();
160        assert_eq!(split_list.len(), 1);
161
162        let fields = source_node
163            .columns
164            .iter()
165            .map(|prost| {
166                let column_desc = prost.column_desc.as_ref().unwrap();
167                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
168                let name = column_desc.name.clone();
169                Field::with_name(data_type, name)
170            })
171            .collect();
172        let schema = Schema::new(fields);
173        let metrics = source.context().batch_metrics();
174
175        if let ConnectorProperties::Iceberg(iceberg_properties) = config
176            && let SplitImpl::Iceberg(split) = &split_list[0]
177        {
178            if let IcebergFileScanTask::CountStar(count) = split.task {
179                return Ok(Box::new(ValuesExecutor::new(
180                    vec![vec![Box::new(LiteralExpression::new(
181                        DataType::Int64,
182                        Some(ScalarImpl::Int64(count as i64)),
183                    ))]],
184                    schema,
185                    source.plan_node().get_identity().clone(),
186                    source.context().get_config().developer.chunk_size,
187                )));
188            }
189            let iceberg_properties: IcebergProperties = *iceberg_properties;
190            let split: IcebergSplit = split.clone();
191            let need_seq_num = schema
192                .fields()
193                .iter()
194                .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
195            let need_file_path_and_pos = schema
196                .fields()
197                .iter()
198                .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
199                && matches!(split.task, IcebergFileScanTask::Data(_));
200
201            Ok(Box::new(IcebergScanExecutor::new(
202                iceberg_properties,
203                Some(split.snapshot_id),
204                split.task,
205                source.context().get_config().developer.chunk_size,
206                schema,
207                source.plan_node().get_identity().clone(),
208                metrics,
209                need_seq_num,
210                need_file_path_and_pos,
211            )))
212        } else {
213            unreachable!()
214        }
215    }
216}