risingwave_batch_executors/executor/
iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20    Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25    IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26    scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37    iceberg_config: IcebergProperties,
38    file_scan_tasks: Option<IcebergFileScanTask>,
39    chunk_size: usize,
40    schema: Schema,
41    identity: String,
42    metrics: Option<BatchMetrics>,
43    need_seq_num: bool,
44    need_file_path_and_pos: bool,
45}
46
47impl Executor for IcebergScanExecutor {
48    fn schema(&self) -> &risingwave_common::catalog::Schema {
49        &self.schema
50    }
51
52    fn identity(&self) -> &str {
53        &self.identity
54    }
55
56    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
57        self.do_execute().boxed()
58    }
59}
60
61impl IcebergScanExecutor {
62    pub fn new(
63        iceberg_config: IcebergProperties,
64        file_scan_tasks: IcebergFileScanTask,
65        chunk_size: usize,
66        schema: Schema,
67        identity: String,
68        metrics: Option<BatchMetrics>,
69        need_seq_num: bool,
70        need_file_path_and_pos: bool,
71    ) -> Self {
72        Self {
73            iceberg_config,
74            chunk_size,
75            schema,
76            file_scan_tasks: Some(file_scan_tasks),
77            identity,
78            metrics,
79            need_seq_num,
80            need_file_path_and_pos,
81        }
82    }
83
84    #[try_stream(ok = DataChunk, error = BatchError)]
85    async fn do_execute(mut self: Box<Self>) {
86        let table = self.iceberg_config.load_table().await?;
87        let data_types = self.schema.data_types();
88
89        let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
90            Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
91            Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
92                equality_delete_file_scan_tasks
93            }
94            Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
95                position_delete_file_scan_tasks
96            }
97            None => {
98                bail!("file_scan_tasks must be Some")
99            }
100        };
101
102        for data_file_scan_task in data_file_scan_tasks {
103            #[for_await]
104            for chunk in scan_task_to_chunk_with_deletes(
105                table.clone(),
106                data_file_scan_task,
107                IcebergScanOpts {
108                    chunk_size: self.chunk_size,
109                    need_seq_num: self.need_seq_num,
110                    need_file_path_and_pos: self.need_file_path_and_pos,
111                    handle_delete_files: table.metadata().format_version()
112                        >= iceberg::spec::FormatVersion::V3,
113                },
114                self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
115            ) {
116                let chunk = chunk?;
117                assert_eq!(chunk.data_types(), data_types);
118                yield chunk;
119            }
120        }
121    }
122}
123
124pub struct IcebergScanExecutorBuilder {}
125
126impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
127    async fn new_boxed_executor(
128        source: &ExecutorBuilder<'_>,
129        inputs: Vec<BoxedExecutor>,
130    ) -> crate::error::Result<BoxedExecutor> {
131        ensure!(
132            inputs.is_empty(),
133            "Iceberg source should not have input executor!"
134        );
135        let source_node = try_match_expand!(
136            source.plan_node().get_node_body().unwrap(),
137            NodeBody::IcebergScan
138        )?;
139
140        // prepare connector source
141        let options_with_secret = WithOptionsSecResolved::new(
142            source_node.with_properties.clone(),
143            source_node.secret_refs.clone(),
144        );
145        let config = ConnectorProperties::extract(options_with_secret, false)?;
146
147        let split_list = source_node
148            .split
149            .iter()
150            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
151            .collect_vec();
152        assert_eq!(split_list.len(), 1);
153
154        let fields = source_node
155            .columns
156            .iter()
157            .map(|prost| {
158                let column_desc = prost.column_desc.as_ref().unwrap();
159                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
160                let name = column_desc.name.clone();
161                Field::with_name(data_type, name)
162            })
163            .collect();
164        let schema = Schema::new(fields);
165        let metrics = source.context().batch_metrics();
166
167        if let ConnectorProperties::Iceberg(iceberg_properties) = config
168            && let SplitImpl::Iceberg(split) = &split_list[0]
169        {
170            let iceberg_properties: IcebergProperties = *iceberg_properties;
171            let split: IcebergSplit = split.clone();
172            let need_seq_num = schema
173                .fields()
174                .iter()
175                .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
176            let need_file_path_and_pos = schema
177                .fields()
178                .iter()
179                .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
180                && matches!(split.task, IcebergFileScanTask::Data(_));
181
182            Ok(Box::new(IcebergScanExecutor::new(
183                iceberg_properties,
184                split.task,
185                source.context().get_config().developer.chunk_size,
186                schema,
187                source.plan_node().get_identity().clone(),
188                metrics,
189                need_seq_num,
190                need_file_path_and_pos,
191            )))
192        } else {
193            unreachable!()
194        }
195    }
196}