risingwave_batch_executors/executor/
iceberg_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20 Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25 IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk,
26};
27use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
28use risingwave_expr::expr::LiteralExpression;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::ValuesExecutor;
33use crate::error::BatchError;
34use crate::executor::Executor;
35use crate::monitor::BatchMetrics;
36
37pub struct IcebergScanExecutor {
38 iceberg_config: IcebergProperties,
39 #[allow(dead_code)]
40 snapshot_id: Option<i64>,
41 file_scan_tasks: Option<IcebergFileScanTask>,
42 chunk_size: usize,
43 schema: Schema,
44 identity: String,
45 metrics: Option<BatchMetrics>,
46 need_seq_num: bool,
47 need_file_path_and_pos: bool,
48}
49
50impl Executor for IcebergScanExecutor {
51 fn schema(&self) -> &risingwave_common::catalog::Schema {
52 &self.schema
53 }
54
55 fn identity(&self) -> &str {
56 &self.identity
57 }
58
59 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
60 self.do_execute().boxed()
61 }
62}
63
64impl IcebergScanExecutor {
65 pub fn new(
66 iceberg_config: IcebergProperties,
67 snapshot_id: Option<i64>,
68 file_scan_tasks: IcebergFileScanTask,
69 chunk_size: usize,
70 schema: Schema,
71 identity: String,
72 metrics: Option<BatchMetrics>,
73 need_seq_num: bool,
74 need_file_path_and_pos: bool,
75 ) -> Self {
76 Self {
77 iceberg_config,
78 snapshot_id,
79 chunk_size,
80 schema,
81 file_scan_tasks: Some(file_scan_tasks),
82 identity,
83 metrics,
84 need_seq_num,
85 need_file_path_and_pos,
86 }
87 }
88
89 #[try_stream(ok = DataChunk, error = BatchError)]
90 async fn do_execute(mut self: Box<Self>) {
91 let table = self.iceberg_config.load_table().await?;
92 let data_types = self.schema.data_types();
93
94 let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
95 Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
96 Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
97 equality_delete_file_scan_tasks
98 }
99 Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
100 position_delete_file_scan_tasks
101 }
102 Some(IcebergFileScanTask::CountStar(_)) => {
103 bail!("iceberg scan executor does not support count star")
104 }
105 None => {
106 bail!("file_scan_tasks must be Some")
107 }
108 };
109
110 for data_file_scan_task in data_file_scan_tasks {
111 #[for_await]
112 for chunk in scan_task_to_chunk(
113 table.clone(),
114 data_file_scan_task,
115 IcebergScanOpts {
116 chunk_size: self.chunk_size,
117 need_seq_num: self.need_seq_num,
118 need_file_path_and_pos: self.need_file_path_and_pos,
119 },
120 self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
121 ) {
122 let chunk = chunk?;
123 assert_eq!(chunk.data_types(), data_types);
124 yield chunk;
125 }
126 }
127 }
128}
129
130pub struct IcebergScanExecutorBuilder {}
131
132impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
133 async fn new_boxed_executor(
134 source: &ExecutorBuilder<'_>,
135 inputs: Vec<BoxedExecutor>,
136 ) -> crate::error::Result<BoxedExecutor> {
137 ensure!(
138 inputs.is_empty(),
139 "Iceberg source should not have input executor!"
140 );
141 let source_node = try_match_expand!(
142 source.plan_node().get_node_body().unwrap(),
143 NodeBody::IcebergScan
144 )?;
145
146 let options_with_secret = WithOptionsSecResolved::new(
148 source_node.with_properties.clone(),
149 source_node.secret_refs.clone(),
150 );
151 let config = ConnectorProperties::extract(options_with_secret.clone(), false)?;
152
153 let split_list = source_node
154 .split
155 .iter()
156 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
157 .collect_vec();
158 assert_eq!(split_list.len(), 1);
159
160 let fields = source_node
161 .columns
162 .iter()
163 .map(|prost| {
164 let column_desc = prost.column_desc.as_ref().unwrap();
165 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
166 let name = column_desc.name.clone();
167 Field::with_name(data_type, name)
168 })
169 .collect();
170 let schema = Schema::new(fields);
171 let metrics = source.context().batch_metrics().clone();
172
173 if let ConnectorProperties::Iceberg(iceberg_properties) = config
174 && let SplitImpl::Iceberg(split) = &split_list[0]
175 {
176 if let IcebergFileScanTask::CountStar(count) = split.task {
177 return Ok(Box::new(ValuesExecutor::new(
178 vec![vec![Box::new(LiteralExpression::new(
179 DataType::Int64,
180 Some(ScalarImpl::Int64(count as i64)),
181 ))]],
182 schema,
183 source.plan_node().get_identity().clone(),
184 source.context().get_config().developer.chunk_size,
185 )));
186 }
187 let iceberg_properties: IcebergProperties = *iceberg_properties;
188 let split: IcebergSplit = split.clone();
189 let need_seq_num = schema
190 .fields()
191 .iter()
192 .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
193 let need_file_path_and_pos = schema
194 .fields()
195 .iter()
196 .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
197 && matches!(split.task, IcebergFileScanTask::Data(_));
198
199 Ok(Box::new(IcebergScanExecutor::new(
200 iceberg_properties,
201 Some(split.snapshot_id),
202 split.task,
203 source.context().get_config().developer.chunk_size,
204 schema,
205 source.plan_node().get_identity().clone(),
206 metrics,
207 need_seq_num,
208 need_file_path_and_pos,
209 )))
210 } else {
211 unreachable!()
212 }
213 }
214}