risingwave_stream/executor/source/
iceberg_list_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use either::Either;
19use futures_async_stream::try_stream;
20use iceberg::scan::FileScanTask;
21use iceberg::spec::DataContentType;
22use parking_lot::Mutex;
23use risingwave_common::array::Op;
24use risingwave_common::catalog::ColumnCatalog;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_connector::source::ConnectorProperties;
28use risingwave_connector::source::iceberg::IcebergProperties;
29use risingwave_connector::source::iceberg::metrics::GLOBAL_ICEBERG_SCAN_METRICS;
30use risingwave_connector::source::reader::desc::SourceDescBuilder;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::UnboundedReceiver;
33
34use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
35use crate::executor::prelude::*;
36use crate::executor::stream_reader::StreamReaderWithPause;
37
38pub struct IcebergListExecutor<S: StateStore> {
39    actor_ctx: ActorContextRef,
40
41    /// Streaming source for external
42    stream_source_core: StreamSourceCore<S>,
43
44    /// Columns of fetch executor, used to plan files.
45    /// For backward compatibility, this can be None, meaning all columns are needed.
46    downstream_columns: Option<Vec<ColumnCatalog>>,
47
48    /// Metrics for monitor.
49    #[expect(dead_code)]
50    metrics: Arc<StreamingMetrics>,
51
52    /// Receiver of barrier channel.
53    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
54
55    /// System parameter reader to read barrier interval
56    #[expect(dead_code)]
57    system_params: SystemParamsReaderRef,
58
59    /// Rate limit in rows/s.
60    #[expect(dead_code)]
61    rate_limit_rps: Option<u32>,
62
63    /// Streaming config
64    streaming_config: Arc<StreamingConfig>,
65}
66
67impl<S: StateStore> IcebergListExecutor<S> {
68    #[allow(clippy::too_many_arguments)]
69    pub fn new(
70        actor_ctx: ActorContextRef,
71        stream_source_core: StreamSourceCore<S>,
72        downstream_columns: Option<Vec<ColumnCatalog>>,
73        metrics: Arc<StreamingMetrics>,
74        barrier_receiver: UnboundedReceiver<Barrier>,
75        system_params: SystemParamsReaderRef,
76        rate_limit_rps: Option<u32>,
77        streaming_config: Arc<StreamingConfig>,
78    ) -> Self {
79        Self {
80            actor_ctx,
81            stream_source_core,
82            downstream_columns,
83            metrics,
84            barrier_receiver: Some(barrier_receiver),
85            system_params,
86            rate_limit_rps,
87            streaming_config,
88        }
89    }
90
91    #[try_stream(ok = Message, error = StreamExecutorError)]
92    async fn into_stream(mut self) {
93        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
94        let first_barrier = barrier_receiver
95            .recv()
96            .instrument_await("source_recv_first_barrier")
97            .await
98            .ok_or_else(|| {
99                anyhow!(
100                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
101                    self.actor_ctx.id,
102                    self.stream_source_core.source_id
103                )
104            })?;
105        let first_epoch = first_barrier.epoch;
106
107        // Build source description from the builder.
108        let source_desc_builder: SourceDescBuilder =
109            self.stream_source_core.source_desc_builder.take().unwrap();
110
111        let properties = source_desc_builder.with_properties();
112        let config = ConnectorProperties::extract(properties, false)?;
113        let ConnectorProperties::Iceberg(iceberg_properties) = config else {
114            unreachable!()
115        };
116
117        // Get consistent column names for schema stability across snapshots
118        let downstream_columns = self.downstream_columns.map(|columns| {
119            columns
120                .iter()
121                .filter_map(|col| {
122                    if col.is_hidden() {
123                        None
124                    } else {
125                        Some(col.name().to_owned())
126                    }
127                })
128                .collect::<Vec<_>>()
129        });
130
131        tracing::debug!("downstream_columns: {:?}", downstream_columns);
132
133        yield Message::Barrier(first_barrier);
134        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
135
136        let state_table = self.stream_source_core.split_state_store.state_table_mut();
137        state_table.init_epoch(first_epoch).await?;
138        let state_row = state_table.get_from_one_value_table().await?;
139        // last_snapshot is EXCLUSIVE (i.e., already scanned)
140        let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
141        let mut prev_persisted_snapshot = last_snapshot;
142
143        if last_snapshot.is_none() {
144            // do a regular scan, then switch to incremental scan
145            // TODO: we may support starting from a specific snapshot/timestamp later
146            let table = iceberg_properties.load_table().await?;
147            // If current_snapshot is None (empty table), we go to incremental scan directly.
148            if let Some(start_snapshot) = table.metadata().current_snapshot() {
149                last_snapshot = Some(start_snapshot.snapshot_id());
150                let snapshot_scan_builder = table.scan().snapshot_id(start_snapshot.snapshot_id());
151
152                let snapshot_scan = if let Some(ref downstream_columns) = downstream_columns {
153                    snapshot_scan_builder.select(downstream_columns)
154                } else {
155                    // for backward compatibility
156                    snapshot_scan_builder.select_all()
157                }
158                .build()
159                .context("failed to build iceberg scan")?;
160
161                let mut chunk_builder = StreamChunkBuilder::new(
162                    self.streaming_config.developer.chunk_size,
163                    vec![DataType::Varchar, DataType::Jsonb],
164                );
165                #[for_await]
166                for scan_task in snapshot_scan
167                    .plan_files()
168                    .await
169                    .context("failed to plan iceberg files")?
170                {
171                    let scan_task = scan_task.context("failed to get scan task")?;
172                    if let Some(chunk) = chunk_builder.append_row(
173                        Op::Insert,
174                        &[
175                            Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
176                            Some(ScalarImpl::Jsonb(
177                                serde_json::to_value(scan_task).unwrap().into(),
178                            )),
179                        ],
180                    ) {
181                        yield Message::Chunk(chunk);
182                    }
183                }
184                if let Some(chunk) = chunk_builder.take() {
185                    yield Message::Chunk(chunk);
186                }
187            }
188        }
189
190        let source_id_str = self.stream_source_core.source_id.to_string();
191        let source_name_str = self.stream_source_core.source_name.clone();
192        let list_table_name = iceberg_properties.table.table_name().to_owned();
193        let list_metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
194
195        let last_snapshot = Arc::new(Mutex::new(last_snapshot));
196        let build_incremental_stream = || {
197            incremental_scan_stream(
198                (*iceberg_properties).clone(),
199                last_snapshot.clone(),
200                self.streaming_config.developer.iceberg_list_interval_sec,
201                downstream_columns.clone(),
202                source_id_str.clone(),
203                source_name_str.clone(),
204            )
205            .map(|res| match res {
206                Ok(scan_task) => {
207                    let row = (
208                        Op::Insert,
209                        OwnedRow::new(vec![
210                            Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
211                            Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
212                        ]),
213                    );
214                    Ok(StreamChunk::from_rows(
215                        &[row],
216                        &[DataType::Varchar, DataType::Jsonb],
217                    ))
218                }
219                Err(e) => Err(e),
220            })
221        };
222
223        let mut stream =
224            StreamReaderWithPause::<true, _>::new(barrier_stream, build_incremental_stream());
225
226        // TODO: support pause (incl. pause on startup)/resume/rate limit
227
228        while let Some(msg) = stream.next().await {
229            match msg {
230                Err(e) => {
231                    tracing::warn!(
232                        error = %e.as_report(),
233                        "incremental iceberg list stream errored, rebuilding"
234                    );
235                    list_metrics
236                        .iceberg_source_scan_errors_total
237                        .with_guarded_label_values(&[
238                            source_id_str.as_str(),
239                            source_name_str.as_str(),
240                            list_table_name.as_str(),
241                            "list_error",
242                        ])
243                        .inc();
244                    stream.replace_data_stream(build_incremental_stream());
245                }
246                Ok(msg) => match msg {
247                    // Barrier arrives.
248                    Either::Left(msg) => match &msg {
249                        Message::Barrier(barrier) => {
250                            if let Some(mutation) = barrier.mutation.as_deref() {
251                                match mutation {
252                                    Mutation::Pause => stream.pause_stream(),
253                                    Mutation::Resume => stream.resume_stream(),
254                                    _ => (),
255                                }
256                            }
257                            if let Some(last_snapshot) = *last_snapshot.lock() {
258                                let state_row =
259                                    OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
260                                if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
261                                    let prev_state_row = OwnedRow::new(vec![
262                                        ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
263                                    ]);
264                                    state_table.update(prev_state_row, state_row);
265                                } else {
266                                    state_table.insert(state_row);
267                                }
268                                prev_persisted_snapshot = Some(last_snapshot);
269                            }
270                            state_table
271                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
272                                .await?;
273                            // Propagate the barrier.
274                            yield msg;
275                        }
276                        // Only barrier can be received.
277                        _ => unreachable!(),
278                    },
279                    // Data arrives.
280                    Either::Right(chunk) => {
281                        yield Message::Chunk(chunk);
282                    }
283                },
284            }
285        }
286    }
287}
288
289/// `last_snapshot` is EXCLUSIVE (i.e., already scanned)
290#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
291async fn incremental_scan_stream(
292    iceberg_properties: IcebergProperties,
293    last_snapshot_lock: Arc<Mutex<Option<i64>>>,
294    list_interval_sec: u64,
295    downstream_columns: Option<Vec<String>>,
296    source_id: String,
297    source_name: String,
298) {
299    let metrics = &GLOBAL_ICEBERG_SCAN_METRICS;
300    let table_name = iceberg_properties.table.table_name().to_owned();
301    let label_values = [
302        source_id.as_str(),
303        source_name.as_str(),
304        table_name.as_str(),
305    ];
306
307    let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
308    loop {
309        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
310
311        // XXX: should we use sth like table.refresh() instead of reload the table every time?
312        // iceberg-java does this, but iceberg-rust doesn't have this API now.
313        let table = iceberg_properties.load_table().await?;
314
315        let Some(current_snapshot) = table.metadata().current_snapshot() else {
316            tracing::info!("Skip incremental scan because table is empty");
317            continue;
318        };
319
320        if Some(current_snapshot.snapshot_id()) == last_snapshot {
321            // Ingestion is caught up — lag is 0.
322            metrics
323                .iceberg_source_snapshot_lag_seconds
324                .with_guarded_label_values(&label_values)
325                .set(0);
326
327            tracing::info!(
328                "Current table snapshot is already enumerated: {}, no new snapshot available",
329                current_snapshot.snapshot_id()
330            );
331            continue;
332        }
333
334        if let Some(last_snapshot_id) = last_snapshot
335            && let Some(last_ingested_snapshot) = table
336                .metadata()
337                .snapshots()
338                .find(|snapshot| snapshot.snapshot_id() == last_snapshot_id)
339        {
340            let lag_secs =
341                (current_snapshot.timestamp_ms() - last_ingested_snapshot.timestamp_ms()).max(0)
342                    / 1000;
343            metrics
344                .iceberg_source_snapshot_lag_seconds
345                .with_guarded_label_values(&label_values)
346                .set(lag_secs);
347        }
348
349        // New snapshot discovered.
350        metrics
351            .iceberg_source_snapshots_discovered_total
352            .with_guarded_label_values(&label_values)
353            .inc();
354
355        let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
356        if let Some(last_snapshot) = last_snapshot {
357            incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
358        }
359        let incremental_scan = if let Some(ref downstream_columns) = downstream_columns {
360            incremental_scan.select(downstream_columns)
361        } else {
362            // for backward compatibility
363            incremental_scan.select_all()
364        }
365        .build()
366        .context("failed to build iceberg scan")?;
367
368        let mut list_duration = std::time::Duration::default();
369        let mut active_since = std::time::Instant::now();
370        let mut data_file_count: u64 = 0;
371        let mut eq_delete_count: u64 = 0;
372        let mut pos_delete_count: u64 = 0;
373
374        #[for_await]
375        for scan_task in incremental_scan
376            .plan_files()
377            .await
378            .context("failed to plan iceberg files")?
379        {
380            let scan_task = scan_task.context("failed to get scan task")?;
381
382            // Count file types: the main task is a data file, deletes are attached.
383            data_file_count += 1;
384            for delete_task in &scan_task.deletes {
385                match delete_task.data_file_content {
386                    DataContentType::EqualityDeletes => eq_delete_count += 1,
387                    DataContentType::PositionDeletes => pos_delete_count += 1,
388                    _ => {}
389                }
390            }
391
392            // Record delete files per data file.
393            metrics
394                .iceberg_source_delete_files_per_data_file
395                .with_guarded_label_values(&label_values)
396                .observe(scan_task.deletes.len() as f64);
397
398            list_duration += active_since.elapsed();
399            yield scan_task;
400            active_since = std::time::Instant::now();
401        }
402
403        list_duration += active_since.elapsed();
404        metrics
405            .iceberg_source_list_duration_seconds
406            .with_guarded_label_values(&label_values)
407            .observe(list_duration.as_secs_f64());
408
409        if data_file_count > 0 {
410            metrics
411                .iceberg_source_files_discovered_total
412                .with_guarded_label_values(&[
413                    label_values[0],
414                    label_values[1],
415                    label_values[2],
416                    "data",
417                ])
418                .inc_by(data_file_count);
419        }
420        if eq_delete_count > 0 {
421            metrics
422                .iceberg_source_files_discovered_total
423                .with_guarded_label_values(&[
424                    label_values[0],
425                    label_values[1],
426                    label_values[2],
427                    "eq_delete",
428                ])
429                .inc_by(eq_delete_count);
430        }
431        if pos_delete_count > 0 {
432            metrics
433                .iceberg_source_files_discovered_total
434                .with_guarded_label_values(&[
435                    label_values[0],
436                    label_values[1],
437                    label_values[2],
438                    "pos_delete",
439                ])
440                .inc_by(pos_delete_count);
441        }
442
443        // This scan has fully ingested the latest snapshot we observed, so lag returns to 0.
444        metrics
445            .iceberg_source_snapshot_lag_seconds
446            .with_guarded_label_values(&label_values)
447            .set(0);
448
449        last_snapshot = Some(current_snapshot.snapshot_id());
450        *last_snapshot_lock.lock() = last_snapshot;
451    }
452}
453
454impl<S: StateStore> Execute for IcebergListExecutor<S> {
455    fn execute(self: Box<Self>) -> BoxedMessageStream {
456        self.into_stream().boxed()
457    }
458}
459
460impl<S: StateStore> Debug for IcebergListExecutor<S> {
461    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
462        f.debug_struct("IcebergListExecutor")
463            .field("source_id", &self.stream_source_core.source_id)
464            .field("column_ids", &self.stream_source_core.column_ids)
465            .finish()
466    }
467}