risingwave_batch_executors/executor/
iceberg_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use itertools::Itertools;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{
20 Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
21};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::source::iceberg::{
25 IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
26 scan_task_to_chunk_with_deletes,
27};
28use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
32use crate::error::BatchError;
33use crate::executor::Executor;
34use crate::monitor::BatchMetrics;
35
36pub struct IcebergScanExecutor {
37 iceberg_config: IcebergProperties,
38 file_scan_tasks: Option<IcebergFileScanTask>,
39 chunk_size: usize,
40 schema: Schema,
41 identity: String,
42 metrics: Option<BatchMetrics>,
43 need_seq_num: bool,
44 need_file_path_and_pos: bool,
45}
46
47impl Executor for IcebergScanExecutor {
48 fn schema(&self) -> &risingwave_common::catalog::Schema {
49 &self.schema
50 }
51
52 fn identity(&self) -> &str {
53 &self.identity
54 }
55
56 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
57 self.do_execute().boxed()
58 }
59}
60
61impl IcebergScanExecutor {
62 pub fn new(
63 iceberg_config: IcebergProperties,
64 file_scan_tasks: IcebergFileScanTask,
65 chunk_size: usize,
66 schema: Schema,
67 identity: String,
68 metrics: Option<BatchMetrics>,
69 need_seq_num: bool,
70 need_file_path_and_pos: bool,
71 ) -> Self {
72 Self {
73 iceberg_config,
74 chunk_size,
75 schema,
76 file_scan_tasks: Some(file_scan_tasks),
77 identity,
78 metrics,
79 need_seq_num,
80 need_file_path_and_pos,
81 }
82 }
83
84 #[try_stream(ok = DataChunk, error = BatchError)]
85 async fn do_execute(mut self: Box<Self>) {
86 let table = self.iceberg_config.load_table().await?;
87 let data_types = self.schema.data_types();
88
89 let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
90 Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
91 Some(IcebergFileScanTask::EqualityDelete(equality_delete_file_scan_tasks)) => {
92 equality_delete_file_scan_tasks
93 }
94 Some(IcebergFileScanTask::PositionDelete(position_delete_file_scan_tasks)) => {
95 position_delete_file_scan_tasks
96 }
97 None => {
98 bail!("file_scan_tasks must be Some")
99 }
100 };
101
102 for data_file_scan_task in data_file_scan_tasks {
103 #[for_await]
104 for chunk in scan_task_to_chunk_with_deletes(
105 table.clone(),
106 data_file_scan_task,
107 IcebergScanOpts {
108 chunk_size: self.chunk_size,
109 need_seq_num: self.need_seq_num,
110 need_file_path_and_pos: self.need_file_path_and_pos,
111 handle_delete_files: false,
112 },
113 self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
114 ) {
115 let chunk = chunk?;
116 assert_eq!(chunk.data_types(), data_types);
117 yield chunk;
118 }
119 }
120 }
121}
122
123pub struct IcebergScanExecutorBuilder {}
124
125impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
126 async fn new_boxed_executor(
127 source: &ExecutorBuilder<'_>,
128 inputs: Vec<BoxedExecutor>,
129 ) -> crate::error::Result<BoxedExecutor> {
130 ensure!(
131 inputs.is_empty(),
132 "Iceberg source should not have input executor!"
133 );
134 let source_node = try_match_expand!(
135 source.plan_node().get_node_body().unwrap(),
136 NodeBody::IcebergScan
137 )?;
138
139 let options_with_secret = WithOptionsSecResolved::new(
141 source_node.with_properties.clone(),
142 source_node.secret_refs.clone(),
143 );
144 let config = ConnectorProperties::extract(options_with_secret, false)?;
145
146 let split_list = source_node
147 .split
148 .iter()
149 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
150 .collect_vec();
151 assert_eq!(split_list.len(), 1);
152
153 let fields = source_node
154 .columns
155 .iter()
156 .map(|prost| {
157 let column_desc = prost.column_desc.as_ref().unwrap();
158 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
159 let name = column_desc.name.clone();
160 Field::with_name(data_type, name)
161 })
162 .collect();
163 let schema = Schema::new(fields);
164 let metrics = source.context().batch_metrics();
165
166 if let ConnectorProperties::Iceberg(iceberg_properties) = config
167 && let SplitImpl::Iceberg(split) = &split_list[0]
168 {
169 let iceberg_properties: IcebergProperties = *iceberg_properties;
170 let split: IcebergSplit = split.clone();
171 let need_seq_num = schema
172 .fields()
173 .iter()
174 .any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
175 let need_file_path_and_pos = schema
176 .fields()
177 .iter()
178 .any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
179 && matches!(split.task, IcebergFileScanTask::Data(_));
180
181 Ok(Box::new(IcebergScanExecutor::new(
182 iceberg_properties,
183 split.task,
184 source.context().get_config().developer.chunk_size,
185 schema,
186 source.plan_node().get_identity().clone(),
187 metrics,
188 need_seq_num,
189 need_file_path_and_pos,
190 )))
191 } else {
192 unreachable!()
193 }
194 }
195}