risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs1use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
27use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
28use risingwave_connector::source::reader::desc::SourceDesc;
29use thiserror_ext::AsReport;
30
31use crate::executor::prelude::*;
32use crate::executor::source::{
33 ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
34};
35use crate::executor::stream_reader::StreamReaderWithPause;
36use crate::task::LocalBarrierManager;
37
38pub struct BatchIcebergFetchExecutor<S: StateStore> {
39 actor_ctx: ActorContextRef,
40
41 stream_source_core: Option<StreamSourceCore<S>>,
43
44 upstream: Option<Executor>,
47
48 barrier_manager: LocalBarrierManager,
50
51 streaming_config: Arc<StreamingConfig>,
52
53 associated_table_id: TableId,
54}
55
56impl<S: StateStore> BatchIcebergFetchExecutor<S> {
57 pub fn new(
58 actor_ctx: ActorContextRef,
59 stream_source_core: StreamSourceCore<S>,
60 upstream: Executor,
61 barrier_manager: LocalBarrierManager,
62 streaming_config: Arc<StreamingConfig>,
63 associated_table_id: Option<TableId>,
64 ) -> Self {
65 assert!(associated_table_id.is_some());
66 Self {
67 actor_ctx,
68 stream_source_core: Some(stream_source_core),
69 upstream: Some(upstream),
70 barrier_manager,
71 streaming_config,
72 associated_table_id: associated_table_id.unwrap(),
73 }
74 }
75}
76
77impl<S: StateStore> BatchIcebergFetchExecutor<S> {
78 #[try_stream(ok = Message, error = StreamExecutorError)]
79 async fn into_stream(mut self) {
80 let mut upstream = self.upstream.take().unwrap().execute();
81 let barrier = expect_first_barrier(&mut upstream).await?;
82 yield Message::Barrier(barrier);
83
84 let mut is_refreshing = false;
85 let mut is_list_finished = false;
86 let mut splits_on_fetch: usize = 0;
87 let is_load_finished = Arc::new(RwLock::new(false));
88 let mut file_queue = VecDeque::new();
89
90 let mut core = self.stream_source_core.take().unwrap();
91 let source_desc_builder = core.source_desc_builder.take().unwrap();
92 let source_desc = source_desc_builder
93 .build()
94 .map_err(StreamExecutorError::connector_error)?;
95
96 let file_path_idx = source_desc
97 .columns
98 .iter()
99 .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
100 .unwrap();
101 let file_pos_idx = source_desc
102 .columns
103 .iter()
104 .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
105 .unwrap();
106
107 let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
108 upstream,
109 stream::pending().boxed(),
110 );
111
112 while let Some(msg) = stream.next().await {
113 match msg {
114 Err(e) => {
115 tracing::error!(error = %e.as_report(), "Fetch Error");
116 file_queue.clear();
117 *is_load_finished.write() = false;
118 return Err(e);
119 }
120 Ok(msg) => {
121 match msg {
122 Either::Left(msg) => {
123 match msg {
124 Message::Barrier(barrier) => {
125 let mut need_rebuild_reader = false;
126 if let Some(mutation) = barrier.mutation.as_deref() {
127 match mutation {
128 Mutation::Pause => stream.pause_stream(),
129 Mutation::Resume => stream.resume_stream(),
130 Mutation::RefreshStart {
131 associated_source_id,
132 ..
133 } if associated_source_id == &core.source_id => {
134 tracing::info!(
135 ?barrier.epoch,
136 actor_id = %self.actor_ctx.id,
137 source_id = %core.source_id,
138 table_id = %self.associated_table_id,
139 "RefreshStart:"
140 );
141
142 file_queue.clear();
144 splits_on_fetch = 0;
145 is_refreshing = true;
146 is_list_finished = false;
147 *is_load_finished.write() = false;
148
149 need_rebuild_reader = true;
150 }
151 Mutation::ListFinish {
152 associated_source_id,
153 } if associated_source_id == &core.source_id => {
154 tracing::info!(
155 ?barrier.epoch,
156 actor_id = %self.actor_ctx.id,
157 source_id = %core.source_id,
158 table_id = %self.associated_table_id,
159 "ListFinish:"
160 );
161 is_list_finished = true;
162 }
163 _ => {
164 }
166 }
167 }
168
169 if splits_on_fetch == 0
170 && file_queue.is_empty()
171 && is_list_finished
172 && is_refreshing
173 && barrier.is_checkpoint()
174 {
175 tracing::info!(
176 ?barrier.epoch,
177 actor_id = %self.actor_ctx.id,
178 source_id = %core.source_id,
179 table_id = %self.associated_table_id,
180 "Reporting load finished"
181 );
182 self.barrier_manager.report_source_load_finished(
183 barrier.epoch,
184 self.actor_ctx.id,
185 self.associated_table_id,
186 core.source_id,
187 );
188
189 is_list_finished = false;
191 is_refreshing = false;
192 }
193
194 yield Message::Barrier(barrier);
195
196 if need_rebuild_reader
197 || (splits_on_fetch == 0
198 && !file_queue.is_empty()
199 && is_refreshing)
200 {
201 Self::replace_with_new_batch_reader(
202 &mut file_queue,
203 &mut stream,
204 self.streaming_config.clone(),
205 &mut splits_on_fetch,
206 source_desc.clone(),
207 is_load_finished.clone(),
208 )?;
209 }
210 }
211 Message::Chunk(chunk) => {
212 let jsonb_values: Vec<(String, JsonbVal)> = chunk
213 .data_chunk()
214 .rows()
215 .map(|row| {
216 let file_name = row.datum_at(0).unwrap().into_utf8();
217 let split = row.datum_at(1).unwrap().into_jsonb();
218 (file_name.to_owned(), split.to_owned_scalar())
219 })
220 .collect();
221 tracing::debug!(
222 "received file assignments: {:?}",
223 jsonb_values
224 );
225 file_queue.extend(jsonb_values);
226 }
227 Message::Watermark(_) => unreachable!(),
228 }
229 }
230 Either::Right(ChunksWithState { chunks, .. }) => {
231 splits_on_fetch -= 1;
232
233 for chunk in &chunks {
234 let chunk = prune_additional_cols(
235 chunk,
236 &[file_path_idx, file_pos_idx],
237 &source_desc.columns,
238 );
239
240 yield Message::Chunk(chunk);
241 }
242 }
243 }
244 }
245 }
246 }
247 }
248
249 fn replace_with_new_batch_reader<const BIASED: bool>(
250 file_queue: &mut VecDeque<(String, JsonbVal)>,
251 stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
252 streaming_config: Arc<StreamingConfig>,
253 splits_on_fetch: &mut usize,
254 source_desc: SourceDesc,
255 read_finished: Arc<RwLock<bool>>,
256 ) -> StreamExecutorResult<()> {
257 let mut batch =
258 Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
259 for _ in 0..streaming_config.developer.iceberg_fetch_batch_size {
260 if let Some((_, split_json)) = file_queue.pop_front() {
261 batch.push(PersistedFileScanTask::decode(split_json.as_scalar_ref())?);
262 } else {
263 break;
264 }
265 }
266
267 if batch.is_empty() {
268 stream.replace_data_stream(stream::pending().boxed());
269 } else {
270 tracing::debug!("building batch reader with {} files", batch.len());
271 *splits_on_fetch += batch.len();
272 *read_finished.write() = false;
273 let batch_reader = Self::build_batched_stream_reader(
274 source_desc,
275 batch,
276 streaming_config,
277 read_finished,
278 );
279 stream.replace_data_stream(batch_reader.boxed());
280 }
281
282 Ok(())
283 }
284
285 #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
286 async fn build_batched_stream_reader(
287 source_desc: SourceDesc,
288 read_batch: Vec<FileScanTask>,
289 streaming_config: Arc<StreamingConfig>,
290 read_finished: Arc<RwLock<bool>>,
291 ) {
292 let properties = source_desc.source.config.clone();
293 let properties = match properties {
294 risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
295 iceberg_properties
296 }
297 _ => unreachable!(),
298 };
299 let table = properties.load_table().await?;
300
301 for task in read_batch {
302 let mut chunks = vec![];
303 #[for_await]
304 for chunk in scan_task_to_chunk_with_deletes(
305 table.clone(),
306 task,
307 IcebergScanOpts {
308 chunk_size: streaming_config.developer.chunk_size,
309 need_seq_num: true, need_file_path_and_pos: true,
311 handle_delete_files: true, },
313 None,
314 ) {
315 let chunk = chunk?;
316
317 chunks.push(StreamChunk::from_parts(
318 itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec(),
319 chunk,
320 ));
321 }
322 yield ChunksWithState {
323 chunks,
324 data_file_path: "".to_owned(), last_read_pos: None,
326 };
327 }
328
329 *read_finished.write() = true;
330 }
331}
332
333impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
334 fn execute(self: Box<Self>) -> BoxedMessageStream {
335 self.into_stream().boxed()
336 }
337}
338
339impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
340 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
341 if let Some(core) = &self.stream_source_core {
342 f.debug_struct("BatchIcebergFetchExecutor")
343 .field("source_id", &core.source_id)
344 .field("column_ids", &core.column_ids)
345 .finish()
346 } else {
347 f.debug_struct("BatchIcebergFetchExecutor").finish()
348 }
349 }
350}