risingwave_batch_executors/executor/
iceberg_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20 Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25 IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26 scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37 iceberg_config: IcebergProperties,
38 file_scan_tasks: Option<IcebergFileScanTask>,
39 chunk_size: usize,
40 schema: Schema,
41 identity: String,
42 metrics: Option<BatchMetrics>,
43 need_seq_num: bool,
44 need_file_path_and_pos: bool,
45 limit: Option<u64>,
46}
47
48impl Executor for IcebergScanExecutor {
49 fn schema(&self) -> &risingwave_common::catalog::Schema {
50 &self.schema
51 }
52
53 fn identity(&self) -> &str {
54 &self.identity
55 }
56
57 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
58 self.do_execute().boxed()
59 }
60}
61
62impl IcebergScanExecutor {
63 pub fn new(
64 iceberg_config: IcebergProperties,
65 file_scan_tasks: IcebergFileScanTask,
66 chunk_size: usize,
67 schema: Schema,
68 identity: String,
69 metrics: Option<BatchMetrics>,
70 need_seq_num: bool,
71 need_file_path_and_pos: bool,
72 limit: Option<u64>,
73 ) -> Self {
74 Self {
75 iceberg_config,
76 chunk_size,
77 schema,
78 file_scan_tasks: Some(file_scan_tasks),
79 identity,
80 metrics,
81 need_seq_num,
82 need_file_path_and_pos,
83 limit,
84 }
85 }
86
87 #[try_stream(ok = DataChunk, error = BatchError)]
88 async fn do_execute(mut self: Box<Self>) {
89 let table = self.iceberg_config.load_table().await?;
90 let data_types = self.schema.data_types();
91
92 let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
93 Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
94 Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
95 equality_delete_file_scan_tasks
96 }
97 Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
98 position_delete_file_scan_tasks
99 }
100 None => {
101 bail!("file_scan_tasks must be Some")
102 }
103 };
104 let mut remaining_limit = self
105 .limit
106 .map(|limit| usize::try_from(limit).unwrap_or(usize::MAX));
107
108 for data_file_scan_task in data_file_scan_tasks {
109 if matches!(remaining_limit, Some(0)) {
110 return Ok(());
111 }
112
113 #[for_await]
114 for chunk in scan_task_to_chunk_with_deletes(
115 table.clone(),
116 data_file_scan_task,
117 IcebergScanOpts {
118 chunk_size: self.chunk_size,
119 need_seq_num: self.need_seq_num,
120 need_file_path_and_pos: self.need_file_path_and_pos,
121 handle_delete_files: table.metadata().format_version()
122 >= iceberg::spec::FormatVersion::V3,
123 },
124 self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
125 ) {
126 let chunk = chunk?;
127 assert_eq!(chunk.data_types(), data_types);
128 if let Some(remaining) = &mut remaining_limit {
129 if chunk.cardinality() > *remaining {
130 yield take_first_visible_rows(chunk, *remaining);
131 return Ok(());
132 }
133
134 *remaining -= chunk.cardinality();
135 yield chunk;
136
137 if *remaining == 0 {
138 return Ok(());
139 }
140 } else {
141 yield chunk;
142 }
143 }
144 }
145 }
146}
147
148pub struct IcebergScanExecutorBuilder {}
149
150impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
151 async fn new_boxed_executor(
152 source: &ExecutorBuilder<'_>,
153 inputs: Vec<BoxedExecutor>,
154 ) -> crate::error::Result<BoxedExecutor> {
155 ensure!(
156 inputs.is_empty(),
157 "Iceberg source should not have input executor!"
158 );
159 let source_node = try_match_expand!(
160 source.plan_node().get_node_body().unwrap(),
161 NodeBody::IcebergScan
162 )?;
163
164 let options_with_secret = WithOptionsSecResolved::new(
166 source_node.with_properties.clone(),
167 source_node.secret_refs.clone(),
168 );
169 let config = ConnectorProperties::extract(options_with_secret, false)?;
170
171 let split_list = source_node
172 .split
173 .iter()
174 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
175 .collect_vec();
176 assert_eq!(split_list.len(), 1);
177
178 let fields = source_node
179 .columns
180 .iter()
181 .map(|prost| {
182 let column_desc = prost.column_desc.as_ref().unwrap();
183 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
184 let name = column_desc.name.clone();
185 Field::with_name(data_type, name)
186 })
187 .collect();
188 let schema = Schema::new(fields);
189 let metrics = source.context().batch_metrics();
190
191 if let ConnectorProperties::Iceberg(iceberg_properties) = config
192 && let SplitImpl::Iceberg(split) = &split_list[0]
193 {
194 let iceberg_properties: IcebergProperties = *iceberg_properties;
195 let split: IcebergSplit = split.clone();
196 let need_seq_num = schema
197 .fields()
198 .iter()
199 .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
200 let need_file_path_and_pos = schema
201 .fields()
202 .iter()
203 .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
204 && matches!(split.task, IcebergFileScanTask::Data(_));
205
206 Ok(Box::new(IcebergScanExecutor::new(
207 iceberg_properties,
208 split.task,
209 source.context().get_config().developer.chunk_size,
210 schema,
211 source.plan_node().get_identity().clone(),
212 metrics,
213 need_seq_num,
214 need_file_path_and_pos,
215 split.limit,
216 )))
217 } else {
218 unreachable!()
219 }
220 }
221}
222
223fn take_first_visible_rows(chunk: DataChunk, limit: usize) -> DataChunk {
224 if limit >= chunk.cardinality() {
225 return chunk;
226 }
227
228 let indexes = chunk.visibility().iter_ones().take(limit).collect_vec();
229 chunk.reorder_rows(&indexes)
230}