risingwave_batch_executors/executor/
iceberg_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20    Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25    IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk,
26};
27use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
28use risingwave_expr::expr::LiteralExpression;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::ValuesExecutor;
33use crate::error::BatchError;
34use crate::executor::Executor;
35use crate::monitor::BatchMetrics;
36
37pub struct IcebergScanExecutor {
38    iceberg_config: IcebergProperties,
39    #[allow(dead_code)]
40    snapshot_id: Option<i64>,
41    file_scan_tasks: Option<IcebergFileScanTask>,
42    chunk_size: usize,
43    schema: Schema,
44    identity: String,
45    metrics: Option<BatchMetrics>,
46    need_seq_num: bool,
47    need_file_path_and_pos: bool,
48}
49
50impl Executor for IcebergScanExecutor {
51    fn schema(&self) -> &risingwave_common::catalog::Schema {
52        &self.schema
53    }
54
55    fn identity(&self) -> &str {
56        &self.identity
57    }
58
59    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
60        self.do_execute().boxed()
61    }
62}
63
64impl IcebergScanExecutor {
65    pub fn new(
66        iceberg_config: IcebergProperties,
67        snapshot_id: Option<i64>,
68        file_scan_tasks: IcebergFileScanTask,
69        chunk_size: usize,
70        schema: Schema,
71        identity: String,
72        metrics: Option<BatchMetrics>,
73        need_seq_num: bool,
74        need_file_path_and_pos: bool,
75    ) -> Self {
76        Self {
77            iceberg_config,
78            snapshot_id,
79            chunk_size,
80            schema,
81            file_scan_tasks: Some(file_scan_tasks),
82            identity,
83            metrics,
84            need_seq_num,
85            need_file_path_and_pos,
86        }
87    }
88
89    #[try_stream(ok = DataChunk, error = BatchError)]
90    async fn do_execute(mut self: Box<Self>) {
91        let table = self.iceberg_config.load_table().await?;
92        let data_types = self.schema.data_types();
93
94        let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
95            Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
96            Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
97                equality_delete_file_scan_tasks
98            }
99            Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
100                position_delete_file_scan_tasks
101            }
102            Some(IcebergFileScanTask::CountStar(_)) => {
103                bail!("iceberg scan executor does not support count star")
104            }
105            None => {
106                bail!("file_scan_tasks must be Some")
107            }
108        };
109
110        for data_file_scan_task in data_file_scan_tasks {
111            #[for_await]
112            for chunk in scan_task_to_chunk(
113                table.clone(),
114                data_file_scan_task,
115                IcebergScanOpts {
116                    chunk_size: self.chunk_size,
117                    need_seq_num: self.need_seq_num,
118                    need_file_path_and_pos: self.need_file_path_and_pos,
119                },
120                self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
121            ) {
122                let chunk = chunk?;
123                assert_eq!(chunk.data_types(), data_types);
124                yield chunk;
125            }
126        }
127    }
128}
129
130pub struct IcebergScanExecutorBuilder {}
131
132impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
133    async fn new_boxed_executor(
134        source: &ExecutorBuilder<'_>,
135        inputs: Vec<BoxedExecutor>,
136    ) -> crate::error::Result<BoxedExecutor> {
137        ensure!(
138            inputs.is_empty(),
139            "Iceberg source should not have input executor!"
140        );
141        let source_node = try_match_expand!(
142            source.plan_node().get_node_body().unwrap(),
143            NodeBody::IcebergScan
144        )?;
145
146        // prepare connector source
147        let options_with_secret = WithOptionsSecResolved::new(
148            source_node.with_properties.clone(),
149            source_node.secret_refs.clone(),
150        );
151        let config = ConnectorProperties::extract(options_with_secret.clone(), false)?;
152
153        let split_list = source_node
154            .split
155            .iter()
156            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
157            .collect_vec();
158        assert_eq!(split_list.len(), 1);
159
160        let fields = source_node
161            .columns
162            .iter()
163            .map(|prost| {
164                let column_desc = prost.column_desc.as_ref().unwrap();
165                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
166                let name = column_desc.name.clone();
167                Field::with_name(data_type, name)
168            })
169            .collect();
170        let schema = Schema::new(fields);
171        let metrics = source.context().batch_metrics().clone();
172
173        if let ConnectorProperties::Iceberg(iceberg_properties) = config
174            && let SplitImpl::Iceberg(split) = &split_list[0]
175        {
176            if let IcebergFileScanTask::CountStar(count) = split.task {
177                return Ok(Box::new(ValuesExecutor::new(
178                    vec![vec![Box::new(LiteralExpression::new(
179                        DataType::Int64,
180                        Some(ScalarImpl::Int64(count as i64)),
181                    ))]],
182                    schema,
183                    source.plan_node().get_identity().clone(),
184                    source.context().get_config().developer.chunk_size,
185                )));
186            }
187            let iceberg_properties: IcebergProperties = *iceberg_properties;
188            let split: IcebergSplit = split.clone();
189            let need_seq_num = schema
190                .fields()
191                .iter()
192                .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
193            let need_file_path_and_pos = schema
194                .fields()
195                .iter()
196                .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
197                && matches!(split.task, IcebergFileScanTask::Data(_));
198
199            Ok(Box::new(IcebergScanExecutor::new(
200                iceberg_properties,
201                Some(split.snapshot_id),
202                split.task,
203                source.context().get_config().developer.chunk_size,
204                schema,
205                source.plan_node().get_identity().clone(),
206                metrics,
207                need_seq_num,
208                need_file_path_and_pos,
209            )))
210        } else {
211            unreachable!()
212        }
213    }
214}