risingwave_batch_executors/executor/
iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20    Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25    IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26    scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37    iceberg_config: IcebergProperties,
38    file_scan_tasks: Option<IcebergFileScanTask>,
39    chunk_size: usize,
40    schema: Schema,
41    identity: String,
42    metrics: Option<BatchMetrics>,
43    need_seq_num: bool,
44    need_file_path_and_pos: bool,
45    limit: Option<u64>,
46}
47
48impl Executor for IcebergScanExecutor {
49    fn schema(&self) -> &risingwave_common::catalog::Schema {
50        &self.schema
51    }
52
53    fn identity(&self) -> &str {
54        &self.identity
55    }
56
57    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
58        self.do_execute().boxed()
59    }
60}
61
62impl IcebergScanExecutor {
63    pub fn new(
64        iceberg_config: IcebergProperties,
65        file_scan_tasks: IcebergFileScanTask,
66        chunk_size: usize,
67        schema: Schema,
68        identity: String,
69        metrics: Option<BatchMetrics>,
70        need_seq_num: bool,
71        need_file_path_and_pos: bool,
72        limit: Option<u64>,
73    ) -> Self {
74        Self {
75            iceberg_config,
76            chunk_size,
77            schema,
78            file_scan_tasks: Some(file_scan_tasks),
79            identity,
80            metrics,
81            need_seq_num,
82            need_file_path_and_pos,
83            limit,
84        }
85    }
86
87    #[try_stream(ok = DataChunk, error = BatchError)]
88    async fn do_execute(mut self: Box<Self>) {
89        let table = self.iceberg_config.load_table().await?;
90        let data_types = self.schema.data_types();
91
92        let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
93            Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
94            Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
95                equality_delete_file_scan_tasks
96            }
97            Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
98                position_delete_file_scan_tasks
99            }
100            None => {
101                bail!("file_scan_tasks must be Some")
102            }
103        };
104        let mut remaining_limit = self
105            .limit
106            .map(|limit| usize::try_from(limit).unwrap_or(usize::MAX));
107
108        for data_file_scan_task in data_file_scan_tasks {
109            if matches!(remaining_limit, Some(0)) {
110                return Ok(());
111            }
112
113            #[for_await]
114            for chunk in scan_task_to_chunk_with_deletes(
115                table.clone(),
116                data_file_scan_task,
117                IcebergScanOpts {
118                    chunk_size: self.chunk_size,
119                    need_seq_num: self.need_seq_num,
120                    need_file_path_and_pos: self.need_file_path_and_pos,
121                    handle_delete_files: table.metadata().format_version()
122                        >= iceberg::spec::FormatVersion::V3,
123                },
124                self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
125            ) {
126                let chunk = chunk?;
127                assert_eq!(chunk.data_types(), data_types);
128                if let Some(remaining) = &mut remaining_limit {
129                    if chunk.cardinality() > *remaining {
130                        yield take_first_visible_rows(chunk, *remaining);
131                        return Ok(());
132                    }
133
134                    *remaining -= chunk.cardinality();
135                    yield chunk;
136
137                    if *remaining == 0 {
138                        return Ok(());
139                    }
140                } else {
141                    yield chunk;
142                }
143            }
144        }
145    }
146}
147
148pub struct IcebergScanExecutorBuilder {}
149
150impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
151    async fn new_boxed_executor(
152        source: &ExecutorBuilder<'_>,
153        inputs: Vec<BoxedExecutor>,
154    ) -> crate::error::Result<BoxedExecutor> {
155        ensure!(
156            inputs.is_empty(),
157            "Iceberg source should not have input executor!"
158        );
159        let source_node = try_match_expand!(
160            source.plan_node().get_node_body().unwrap(),
161            NodeBody::IcebergScan
162        )?;
163
164        // prepare connector source
165        let options_with_secret = WithOptionsSecResolved::new(
166            source_node.with_properties.clone(),
167            source_node.secret_refs.clone(),
168        );
169        let config = ConnectorProperties::extract(options_with_secret, false)?;
170
171        let split_list = source_node
172            .split
173            .iter()
174            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
175            .collect_vec();
176        assert_eq!(split_list.len(), 1);
177
178        let fields = source_node
179            .columns
180            .iter()
181            .map(|prost| {
182                let column_desc = prost.column_desc.as_ref().unwrap();
183                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
184                let name = column_desc.name.clone();
185                Field::with_name(data_type, name)
186            })
187            .collect();
188        let schema = Schema::new(fields);
189        let metrics = source.context().batch_metrics();
190
191        if let ConnectorProperties::Iceberg(iceberg_properties) = config
192            && let SplitImpl::Iceberg(split) = &split_list[0]
193        {
194            let iceberg_properties: IcebergProperties = *iceberg_properties;
195            let split: IcebergSplit = split.clone();
196            let need_seq_num = schema
197                .fields()
198                .iter()
199                .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
200            let need_file_path_and_pos = schema
201                .fields()
202                .iter()
203                .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
204                && matches!(split.task, IcebergFileScanTask::Data(_));
205
206            Ok(Box::new(IcebergScanExecutor::new(
207                iceberg_properties,
208                split.task,
209                source.context().get_config().developer.chunk_size,
210                schema,
211                source.plan_node().get_identity().clone(),
212                metrics,
213                need_seq_num,
214                need_file_path_and_pos,
215                split.limit,
216            )))
217        } else {
218            unreachable!()
219        }
220    }
221}
222
223fn take_first_visible_rows(chunk: DataChunk, limit: usize) -> DataChunk {
224    if limit >= chunk.cardinality() {
225        return chunk;
226    }
227
228    let indexes = chunk.visibility().iter_ones().take(limit).collect_vec();
229    chunk.reorder_rows(&indexes)
230}