risingwave_stream/executor/source/
iceberg_list_executor.rs1use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use either::Either;
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use iceberg::spec::DataContentType;
22use parking_lot::Mutex;
23use risingwave_common::array::Op;
24use risingwave_common::catalog::ColumnCatalog;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_connector::source::ConnectorProperties;
28use risingwave_connector::source::iceberg::IcebergProperties;
29use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
30use risingwave_connector::source::reader::desc::SourceDescBuilder;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::UnboundedReceiver;
33
34use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
35use crate::executor::prelude::*;
36use crate::executor::stream_reader::StreamReaderWithPause;
37
38pub struct IcebergListExecutor<S: StateStore> {
39 actor_ctx: ActorContextRef,
40
41 stream_source_core: StreamSourceCore<S>,
43
44 downstream_columns: Option<Vec<ColumnCatalog>>,
47
48 #[expect(dead_code)]
50 metrics: Arc<StreamingMetrics>,
51
52 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
54
55 #[expect(dead_code)]
57 system_params: SystemParamsReaderRef,
58
59 #[expect(dead_code)]
61 rate_limit_rps: Option<u32>,
62
63 streaming_config: Arc<StreamingConfig>,
65}
66
67impl<S: StateStore> IcebergListExecutor<S> {
68 #[allow(clippy::too_many_arguments)]
69 pub fn new(
70 actor_ctx: ActorContextRef,
71 stream_source_core: StreamSourceCore<S>,
72 downstream_columns: Option<Vec<ColumnCatalog>>,
73 metrics: Arc<StreamingMetrics>,
74 barrier_receiver: UnboundedReceiver<Barrier>,
75 system_params: SystemParamsReaderRef,
76 rate_limit_rps: Option<u32>,
77 streaming_config: Arc<StreamingConfig>,
78 ) -> Self {
79 Self {
80 actor_ctx,
81 stream_source_core,
82 downstream_columns,
83 metrics,
84 barrier_receiver: Some(barrier_receiver),
85 system_params,
86 rate_limit_rps,
87 streaming_config,
88 }
89 }
90
91 #[try_stream(ok = Message, error = StreamExecutorError)]
92 async fn into_stream(mut self) {
93 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
94 let first_barrier = barrier_receiver
95 .recv()
96 .instrument_await("source_recv_first_barrier")
97 .await
98 .ok_or_else(|| {
99 anyhow!(
100 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
101 self.actor_ctx.id,
102 self.stream_source_core.source_id
103 )
104 })?;
105 let first_epoch = first_barrier.epoch;
106
107 let source_desc_builder: SourceDescBuilder =
109 self.stream_source_core.source_desc_builder.take().unwrap();
110
111 let properties = source_desc_builder.with_properties();
112 let config = ConnectorProperties::extract(properties, false)?;
113 let ConnectorProperties::Iceberg(iceberg_properties) = config else {
114 unreachable!()
115 };
116
117 let downstream_columns = self.downstream_columns.map(|columns| {
119 columns
120 .iter()
121 .filter_map(|col| {
122 if col.is_hidden() {
123 None
124 } else {
125 Some(col.name().to_owned())
126 }
127 })
128 .collect::<Vec<_>>()
129 });
130
131 tracing::debug!("downstream_columns: {:?}", downstream_columns);
132
133 yield Message::Barrier(first_barrier);
134 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
135
136 let state_table = self.stream_source_core.split_state_store.state_table_mut();
137 state_table.init_epoch(first_epoch).await?;
138 let state_row = state_table.get_from_one_value_table().await?;
139 let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
141 let mut prev_persisted_snapshot = last_snapshot;
142
143 if last_snapshot.is_none() {
144 let table = iceberg_properties.load_table().await?;
147 if let Some(start_snapshot) = table.metadata().current_snapshot() {
149 last_snapshot = Some(start_snapshot.snapshot_id());
150 let snapshot_scan_builder = table.scan().snapshot_id(start_snapshot.snapshot_id());
151
152 let snapshot_scan = if let Some(ref downstream_columns) = downstream_columns {
153 snapshot_scan_builder.select(downstream_columns)
154 } else {
155 snapshot_scan_builder.select_all()
157 }
158 .build()
159 .context("failed to build iceberg scan")?;
160
161 let mut chunk_builder = StreamChunkBuilder::new(
162 self.streaming_config.developer.chunk_size,
163 vec![DataType::Varchar, DataType::Jsonb],
164 );
165 #[for_await]
166 for scan_task in snapshot_scan
167 .plan_files()
168 .await
169 .context("failed to plan iceberg files")?
170 {
171 let scan_task = scan_task.context("failed to get scan task")?;
172 if let Some(chunk) = chunk_builder.append_row(
173 Op::Insert,
174 &[
175 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
176 Some(ScalarImpl::Jsonb(
177 serde_json::to_value(scan_task).unwrap().into(),
178 )),
179 ],
180 ) {
181 yield Message::Chunk(chunk);
182 }
183 }
184 if let Some(chunk) = chunk_builder.take() {
185 yield Message::Chunk(chunk);
186 }
187 }
188 }
189
190 let source_id_str = self.stream_source_core.source_id.to_string();
191 let source_name_str = self.stream_source_core.source_name.clone();
192 let list_table_name = iceberg_properties.table.table_name().to_owned();
193 let list_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
194
195 let last_snapshot = Arc::new(Mutex::new(last_snapshot));
196 let build_incremental_stream = || {
197 incremental_scan_stream(
198 (*iceberg_properties).clone(),
199 last_snapshot.clone(),
200 self.streaming_config.developer.iceberg_list_interval_sec,
201 downstream_columns.clone(),
202 source_id_str.clone(),
203 source_name_str.clone(),
204 )
205 .map(|res| match res {
206 Ok(scan_task) => {
207 let row = (
208 Op::Insert,
209 OwnedRow::new(vec![
210 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
211 Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
212 ]),
213 );
214 Ok(StreamChunk::from_rows(
215 &[row],
216 &[DataType::Varchar, DataType::Jsonb],
217 ))
218 }
219 Err(e) => Err(e),
220 })
221 };
222
223 let mut stream =
224 StreamReaderWithPause::<true, _>::new(barrier_stream, build_incremental_stream());
225
226 while let Some(msg) = stream.next().await {
229 match msg {
230 Err(e) => {
231 tracing::warn!(
232 error = %e.as_report(),
233 "incremental iceberg list stream errored, rebuilding"
234 );
235 list_metrics
236 .iceberg_source_scan_errors_total
237 .with_guarded_label_values(&[
238 source_id_str.as_str(),
239 source_name_str.as_str(),
240 list_table_name.as_str(),
241 "list_error",
242 ])
243 .inc();
244 stream.replace_data_stream(build_incremental_stream());
245 }
246 Ok(msg) => match msg {
247 Either::Left(msg) => match &msg {
249 Message::Barrier(barrier) => {
250 if let Some(mutation) = barrier.mutation.as_deref() {
251 match mutation {
252 Mutation::Pause => stream.pause_stream(),
253 Mutation::Resume => stream.resume_stream(),
254 _ => (),
255 }
256 }
257 if let Some(last_snapshot) = *last_snapshot.lock() {
258 let state_row =
259 OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
260 if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
261 let prev_state_row = OwnedRow::new(vec![
262 ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
263 ]);
264 state_table.update(prev_state_row, state_row);
265 } else {
266 state_table.insert(state_row);
267 }
268 prev_persisted_snapshot = Some(last_snapshot);
269 }
270 state_table
271 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
272 .await?;
273 yield msg;
275 }
276 _ => unreachable!(),
278 },
279 Either::Right(chunk) => {
281 yield Message::Chunk(chunk);
282 }
283 },
284 }
285 }
286 }
287}
288
289#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
291async fn incremental_scan_stream(
292 iceberg_properties: IcebergProperties,
293 last_snapshot_lock: Arc<Mutex<Option<i64>>>,
294 list_interval_sec: u64,
295 downstream_columns: Option<Vec<String>>,
296 source_id: String,
297 source_name: String,
298) {
299 let metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
300 let table_name = iceberg_properties.table.table_name().to_owned();
301 let label_values = [
302 source_id.as_str(),
303 source_name.as_str(),
304 table_name.as_str(),
305 ];
306
307 let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
308 loop {
309 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
310
311 let table = iceberg_properties.load_table().await?;
314
315 let Some(current_snapshot) = table.metadata().current_snapshot() else {
316 tracing::info!("Skip incremental scan because table is empty");
317 continue;
318 };
319
320 if Some(current_snapshot.snapshot_id()) == last_snapshot {
321 metrics
323 .iceberg_source_snapshot_lag_seconds
324 .with_guarded_label_values(&label_values)
325 .set(0);
326
327 tracing::info!(
328 "Current table snapshot is already enumerated: {}, no new snapshot available",
329 current_snapshot.snapshot_id()
330 );
331 continue;
332 }
333
334 if let Some(last_snapshot_id) = last_snapshot
335 && let Some(last_ingested_snapshot) = table
336 .metadata()
337 .snapshots()
338 .find(|snapshot| snapshot.snapshot_id() == last_snapshot_id)
339 {
340 let lag_secs =
341 (current_snapshot.timestamp_ms() - last_ingested_snapshot.timestamp_ms()).max(0)
342 / 1000;
343 metrics
344 .iceberg_source_snapshot_lag_seconds
345 .with_guarded_label_values(&label_values)
346 .set(lag_secs);
347 }
348
349 metrics
351 .iceberg_source_snapshots_discovered_total
352 .with_guarded_label_values(&label_values)
353 .inc();
354
355 let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
356 if let Some(last_snapshot) = last_snapshot {
357 incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
358 }
359 let incremental_scan = if let Some(ref downstream_columns) = downstream_columns {
360 incremental_scan.select(downstream_columns)
361 } else {
362 incremental_scan.select_all()
364 }
365 .build()
366 .context("failed to build iceberg scan")?;
367
368 let mut list_duration = std::time::Duration::default();
369 let mut active_since = std::time::Instant::now();
370 let mut data_file_count: u64 = 0;
371 let mut eq_delete_count: u64 = 0;
372 let mut pos_delete_count: u64 = 0;
373
374 #[for_await]
375 for scan_task in incremental_scan
376 .plan_files()
377 .await
378 .context("failed to plan iceberg files")?
379 {
380 let scan_task = scan_task.context("failed to get scan task")?;
381
382 data_file_count += 1;
384 for delete_task in &scan_task.deletes {
385 match delete_task.data_file_content {
386 DataContentType::EqualityDeletes => eq_delete_count += 1,
387 DataContentType::PositionDeletes => pos_delete_count += 1,
388 _ => {}
389 }
390 }
391
392 metrics
394 .iceberg_source_delete_files_per_data_file
395 .with_guarded_label_values(&label_values)
396 .observe(scan_task.deletes.len() as f64);
397
398 list_duration += active_since.elapsed();
399 yield scan_task;
400 active_since = std::time::Instant::now();
401 }
402
403 list_duration += active_since.elapsed();
404 metrics
405 .iceberg_source_list_duration_seconds
406 .with_guarded_label_values(&label_values)
407 .observe(list_duration.as_secs_f64());
408
409 if data_file_count > 0 {
410 metrics
411 .iceberg_source_files_discovered_total
412 .with_guarded_label_values(&[
413 label_values[0],
414 label_values[1],
415 label_values[2],
416 "data",
417 ])
418 .inc_by(data_file_count);
419 }
420 if eq_delete_count > 0 {
421 metrics
422 .iceberg_source_files_discovered_total
423 .with_guarded_label_values(&[
424 label_values[0],
425 label_values[1],
426 label_values[2],
427 "eq_delete",
428 ])
429 .inc_by(eq_delete_count);
430 }
431 if pos_delete_count > 0 {
432 metrics
433 .iceberg_source_files_discovered_total
434 .with_guarded_label_values(&[
435 label_values[0],
436 label_values[1],
437 label_values[2],
438 "pos_delete",
439 ])
440 .inc_by(pos_delete_count);
441 }
442
443 metrics
445 .iceberg_source_snapshot_lag_seconds
446 .with_guarded_label_values(&label_values)
447 .set(0);
448
449 last_snapshot = Some(current_snapshot.snapshot_id());
450 *last_snapshot_lock.lock() = last_snapshot;
451 }
452}
453
454impl<S: StateStore> Execute for IcebergListExecutor<S> {
455 fn execute(self: Box<Self>) -> BoxedMessageStream {
456 self.into_stream().boxed()
457 }
458}
459
460impl<S: StateStore> Debug for IcebergListExecutor<S> {
461 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462 f.debug_struct("IcebergListExecutor")
463 .field("source_id", &self.stream_source_core.source_id)
464 .field("column_ids", &self.stream_source_core.column_ids)
465 .finish()
466 }
467}