risingwave_batch_executors/executor/
iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20    Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25    IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26    scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37    iceberg_config: IcebergProperties,
38    file_scan_tasks: Option<IcebergFileScanTask>,
39    chunk_size: usize,
40    schema: Schema,
41    identity: String,
42    metrics: Option<BatchMetrics>,
43    need_seq_num: bool,
44    need_file_path_and_pos: bool,
45}
46
47impl Executor for IcebergScanExecutor {
48    fn schema(&self) -> &risingwave_common::catalog::Schema {
49        &self.schema
50    }
51
52    fn identity(&self) -> &str {
53        &self.identity
54    }
55
56    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
57        self.do_execute().boxed()
58    }
59}
60
61impl IcebergScanExecutor {
62    pub fn new(
63        iceberg_config: IcebergProperties,
64        file_scan_tasks: IcebergFileScanTask,
65        chunk_size: usize,
66        schema: Schema,
67        identity: String,
68        metrics: Option<BatchMetrics>,
69        need_seq_num: bool,
70        need_file_path_and_pos: bool,
71    ) -> Self {
72        Self {
73            iceberg_config,
74            chunk_size,
75            schema,
76            file_scan_tasks: Some(file_scan_tasks),
77            identity,
78            metrics,
79            need_seq_num,
80            need_file_path_and_pos,
81        }
82    }
83
84    #[try_stream(ok = DataChunk, error = BatchError)]
85    async fn do_execute(mut self: Box<Self>) {
86        let table = self.iceberg_config.load_table().await?;
87        let data_types = self.schema.data_types();
88
89        let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
90            Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
91            Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
92                equality_delete_file_scan_tasks
93            }
94            Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
95                position_delete_file_scan_tasks
96            }
97            None => {
98                bail!("file_scan_tasks must be Some")
99            }
100        };
101
102        for data_file_scan_task in data_file_scan_tasks {
103            #[for_await]
104            for chunk in scan_task_to_chunk_with_deletes(
105                table.clone(),
106                data_file_scan_task,
107                IcebergScanOpts {
108                    chunk_size: self.chunk_size,
109                    need_seq_num: self.need_seq_num,
110                    need_file_path_and_pos: self.need_file_path_and_pos,
111                    handle_delete_files: false,
112                },
113                self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
114            ) {
115                let chunk = chunk?;
116                assert_eq!(chunk.data_types(), data_types);
117                yield chunk;
118            }
119        }
120    }
121}
122
123pub struct IcebergScanExecutorBuilder {}
124
125impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
126    async fn new_boxed_executor(
127        source: &ExecutorBuilder<'_>,
128        inputs: Vec<BoxedExecutor>,
129    ) -> crate::error::Result<BoxedExecutor> {
130        ensure!(
131            inputs.is_empty(),
132            "Iceberg source should not have input executor!"
133        );
134        let source_node = try_match_expand!(
135            source.plan_node().get_node_body().unwrap(),
136            NodeBody::IcebergScan
137        )?;
138
139        // prepare connector source
140        let options_with_secret = WithOptionsSecResolved::new(
141            source_node.with_properties.clone(),
142            source_node.secret_refs.clone(),
143        );
144        let config = ConnectorProperties::extract(options_with_secret, false)?;
145
146        let split_list = source_node
147            .split
148            .iter()
149            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
150            .collect_vec();
151        assert_eq!(split_list.len(), 1);
152
153        let fields = source_node
154            .columns
155            .iter()
156            .map(|prost| {
157                let column_desc = prost.column_desc.as_ref().unwrap();
158                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
159                let name = column_desc.name.clone();
160                Field::with_name(data_type, name)
161            })
162            .collect();
163        let schema = Schema::new(fields);
164        let metrics = source.context().batch_metrics();
165
166        if let ConnectorProperties::Iceberg(iceberg_properties) = config
167            && let SplitImpl::Iceberg(split) = &split_list[0]
168        {
169            let iceberg_properties: IcebergProperties = *iceberg_properties;
170            let split: IcebergSplit = split.clone();
171            let need_seq_num = schema
172                .fields()
173                .iter()
174                .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
175            let need_file_path_and_pos = schema
176                .fields()
177                .iter()
178                .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
179                && matches!(split.task, IcebergFileScanTask::Data(_));
180
181            Ok(Box::new(IcebergScanExecutor::new(
182                iceberg_properties,
183                split.task,
184                source.context().get_config().developer.chunk_size,
185                schema,
186                source.plan_node().get_identity().clone(),
187                metrics,
188                need_seq_num,
189                need_file_path_and_pos,
190            )))
191        } else {
192            unreachable!()
193        }
194    }
195}