risingwave_batch_executors/executor/
iceberg_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20 Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25 IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26 scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37 iceberg_config: IcebergProperties,
38 file_scan_tasks: Option<IcebergFileScanTask>,
39 chunk_size: usize,
40 schema: Schema,
41 identity: String,
42 metrics: Option<BatchMetrics>,
43 need_seq_num: bool,
44 need_file_path_and_pos: bool,
45}
46
47impl Executor for IcebergScanExecutor {
48 fn schema(&self) -> &risingwave_common::catalog::Schema {
49 &self.schema
50 }
51
52 fn identity(&self) -> &str {
53 &self.identity
54 }
55
56 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
57 self.do_execute().boxed()
58 }
59}
60
61impl IcebergScanExecutor {
62 pub fn new(
63 iceberg_config: IcebergProperties,
64 file_scan_tasks: IcebergFileScanTask,
65 chunk_size: usize,
66 schema: Schema,
67 identity: String,
68 metrics: Option<BatchMetrics>,
69 need_seq_num: bool,
70 need_file_path_and_pos: bool,
71 ) -> Self {
72 Self {
73 iceberg_config,
74 chunk_size,
75 schema,
76 file_scan_tasks: Some(file_scan_tasks),
77 identity,
78 metrics,
79 need_seq_num,
80 need_file_path_and_pos,
81 }
82 }
83
84 #[try_stream(ok = DataChunk, error = BatchError)]
85 async fn do_execute(mut self: Box<Self>) {
86 let table = self.iceberg_config.load_table().await?;
87 let data_types = self.schema.data_types();
88
89 let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
90 Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
91 Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
92 equality_delete_file_scan_tasks
93 }
94 Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
95 position_delete_file_scan_tasks
96 }
97 None => {
98 bail!("file_scan_tasks must be Some")
99 }
100 };
101
102 for data_file_scan_task in data_file_scan_tasks {
103 #[for_await]
104 for chunk in scan_task_to_chunk_with_deletes(
105 table.clone(),
106 data_file_scan_task,
107 IcebergScanOpts {
108 chunk_size: self.chunk_size,
109 need_seq_num: self.need_seq_num,
110 need_file_path_and_pos: self.need_file_path_and_pos,
111 handle_delete_files: table.metadata().format_version()
112 >= iceberg::spec::FormatVersion::V3,
113 },
114 self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
115 ) {
116 let chunk = chunk?;
117 assert_eq!(chunk.data_types(), data_types);
118 yield chunk;
119 }
120 }
121 }
122}
123
124pub struct IcebergScanExecutorBuilder {}
125
126impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
127 async fn new_boxed_executor(
128 source: &ExecutorBuilder<'_>,
129 inputs: Vec<BoxedExecutor>,
130 ) -> crate::error::Result<BoxedExecutor> {
131 ensure!(
132 inputs.is_empty(),
133 "Iceberg source should not have input executor!"
134 );
135 let source_node = try_match_expand!(
136 source.plan_node().get_node_body().unwrap(),
137 NodeBody::IcebergScan
138 )?;
139
140 let options_with_secret = WithOptionsSecResolved::new(
142 source_node.with_properties.clone(),
143 source_node.secret_refs.clone(),
144 );
145 let config = ConnectorProperties::extract(options_with_secret, false)?;
146
147 let split_list = source_node
148 .split
149 .iter()
150 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
151 .collect_vec();
152 assert_eq!(split_list.len(), 1);
153
154 let fields = source_node
155 .columns
156 .iter()
157 .map(|prost| {
158 let column_desc = prost.column_desc.as_ref().unwrap();
159 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
160 let name = column_desc.name.clone();
161 Field::with_name(data_type, name)
162 })
163 .collect();
164 let schema = Schema::new(fields);
165 let metrics = source.context().batch_metrics();
166
167 if let ConnectorProperties::Iceberg(iceberg_properties) = config
168 && let SplitImpl::Iceberg(split) = &split_list[0]
169 {
170 let iceberg_properties: IcebergProperties = *iceberg_properties;
171 let split: IcebergSplit = split.clone();
172 let need_seq_num = schema
173 .fields()
174 .iter()
175 .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
176 let need_file_path_and_pos = schema
177 .fields()
178 .iter()
179 .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
180 && matches!(split.task, IcebergFileScanTask::Data(_));
181
182 Ok(Box::new(IcebergScanExecutor::new(
183 iceberg_properties,
184 split.task,
185 source.context().get_config().developer.chunk_size,
186 schema,
187 source.plan_node().get_identity().clone(),
188 metrics,
189 need_seq_num,
190 need_file_path_and_pos,
191 )))
192 } else {
193 unreachable!()
194 }
195 }
196}