risingwave_batch_executors/executor/
iceberg_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20 Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25 IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26 scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_expr::expr::LiteralExpression;
30use risingwave_pb::batch_plan::plan_node::NodeBody;
31
32use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
33use crate::ValuesExecutor;
34use crate::error::BatchError;
35use crate::executor::Executor;
36use crate::monitor::BatchMetrics;
37
38pub struct IcebergScanExecutor {
39 iceberg_config: IcebergProperties,
40 #[allow(dead_code)]
41 snapshot_id: Option<i64>,
42 file_scan_tasks: Option<IcebergFileScanTask>,
43 chunk_size: usize,
44 schema: Schema,
45 identity: String,
46 metrics: Option<BatchMetrics>,
47 need_seq_num: bool,
48 need_file_path_and_pos: bool,
49}
50
51impl Executor for IcebergScanExecutor {
52 fn schema(&self) -> &risingwave_common::catalog::Schema {
53 &self.schema
54 }
55
56 fn identity(&self) -> &str {
57 &self.identity
58 }
59
60 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
61 self.do_execute().boxed()
62 }
63}
64
65impl IcebergScanExecutor {
66 pub fn new(
67 iceberg_config: IcebergProperties,
68 snapshot_id: Option<i64>,
69 file_scan_tasks: IcebergFileScanTask,
70 chunk_size: usize,
71 schema: Schema,
72 identity: String,
73 metrics: Option<BatchMetrics>,
74 need_seq_num: bool,
75 need_file_path_and_pos: bool,
76 ) -> Self {
77 Self {
78 iceberg_config,
79 snapshot_id,
80 chunk_size,
81 schema,
82 file_scan_tasks: Some(file_scan_tasks),
83 identity,
84 metrics,
85 need_seq_num,
86 need_file_path_and_pos,
87 }
88 }
89
90 #[try_stream(ok = DataChunk, error = BatchError)]
91 async fn do_execute(mut self: Box<Self>) {
92 let table = self.iceberg_config.load_table().await?;
93 let data_types = self.schema.data_types();
94
95 let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
96 Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
97 Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
98 equality_delete_file_scan_tasks
99 }
100 Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
101 position_delete_file_scan_tasks
102 }
103 Some(IcebergFileScanTask::CountStar(_)) => {
104 bail!("iceberg scan executor does not support count star")
105 }
106 None => {
107 bail!("file_scan_tasks must be Some")
108 }
109 };
110
111 for data_file_scan_task in data_file_scan_tasks {
112 #[for_await]
113 for chunk in scan_task_to_chunk_with_deletes(
114 table.clone(),
115 data_file_scan_task,
116 IcebergScanOpts {
117 chunk_size: self.chunk_size,
118 need_seq_num: self.need_seq_num,
119 need_file_path_and_pos: self.need_file_path_and_pos,
120 handle_delete_files: false,
121 },
122 self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
123 ) {
124 let chunk = chunk?;
125 assert_eq!(chunk.data_types(), data_types);
126 yield chunk;
127 }
128 }
129 }
130}
131
132pub struct IcebergScanExecutorBuilder {}
133
134impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
135 async fn new_boxed_executor(
136 source: &ExecutorBuilder<'_>,
137 inputs: Vec<BoxedExecutor>,
138 ) -> crate::error::Result<BoxedExecutor> {
139 ensure!(
140 inputs.is_empty(),
141 "Iceberg source should not have input executor!"
142 );
143 let source_node = try_match_expand!(
144 source.plan_node().get_node_body().unwrap(),
145 NodeBody::IcebergScan
146 )?;
147
148 let options_with_secret = WithOptionsSecResolved::new(
150 source_node.with_properties.clone(),
151 source_node.secret_refs.clone(),
152 );
153 let config = ConnectorProperties::extract(options_with_secret, false)?;
154
155 let split_list = source_node
156 .split
157 .iter()
158 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
159 .collect_vec();
160 assert_eq!(split_list.len(), 1);
161
162 let fields = source_node
163 .columns
164 .iter()
165 .map(|prost| {
166 let column_desc = prost.column_desc.as_ref().unwrap();
167 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
168 let name = column_desc.name.clone();
169 Field::with_name(data_type, name)
170 })
171 .collect();
172 let schema = Schema::new(fields);
173 let metrics = source.context().batch_metrics();
174
175 if let ConnectorProperties::Iceberg(iceberg_properties) = config
176 && let SplitImpl::Iceberg(split) = &split_list[0]
177 {
178 if let IcebergFileScanTask::CountStar(count) = split.task {
179 return Ok(Box::new(ValuesExecutor::new(
180 vec![vec![Box::new(LiteralExpression::new(
181 DataType::Int64,
182 Some(ScalarImpl::Int64(count as i64)),
183 ))]],
184 schema,
185 source.plan_node().get_identity().clone(),
186 source.context().get_config().developer.chunk_size,
187 )));
188 }
189 let iceberg_properties: IcebergProperties = *iceberg_properties;
190 let split: IcebergSplit = split.clone();
191 let need_seq_num = schema
192 .fields()
193 .iter()
194 .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
195 let need_file_path_and_pos = schema
196 .fields()
197 .iter()
198 .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
199 && matches!(split.task, IcebergFileScanTask::Data(_));
200
201 Ok(Box::new(IcebergScanExecutor::new(
202 iceberg_properties,
203 Some(split.snapshot_id),
204 split.task,
205 source.context().get_config().developer.chunk_size,
206 schema,
207 source.plan_node().get_identity().clone(),
208 metrics,
209 need_seq_num,
210 need_file_path_and_pos,
211 )))
212 } else {
213 unreachable!()
214 }
215 }
216}