risingwave_stream/executor/source/
iceberg_list_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use either::Either;
17use futures_async_stream::try_stream;
18use iceberg::scan::FileScanTask;
19use parking_lot::Mutex;
20use risingwave_common::array::Op;
21use risingwave_common::catalog::ColumnCatalog;
22use risingwave_common::config::StreamingConfig;
23use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
24use risingwave_connector::source::ConnectorProperties;
25use risingwave_connector::source::iceberg::IcebergProperties;
26use risingwave_connector::source::reader::desc::SourceDescBuilder;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
31use crate::executor::prelude::*;
32use crate::executor::stream_reader::StreamReaderWithPause;
33
34pub struct IcebergListExecutor<S: StateStore> {
35    actor_ctx: ActorContextRef,
36
37    /// Streaming source for external
38    stream_source_core: StreamSourceCore<S>,
39
40    /// Columns of fetch executor, used to plan files.
41    /// For backward compatibility, this can be None, meaning all columns are needed.
42    downstream_columns: Option<Vec<ColumnCatalog>>,
43
44    /// Metrics for monitor.
45    #[expect(dead_code)]
46    metrics: Arc<StreamingMetrics>,
47
48    /// Receiver of barrier channel.
49    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
50
51    /// System parameter reader to read barrier interval
52    #[expect(dead_code)]
53    system_params: SystemParamsReaderRef,
54
55    /// Rate limit in rows/s.
56    #[expect(dead_code)]
57    rate_limit_rps: Option<u32>,
58
59    /// Streaming config
60    streaming_config: Arc<StreamingConfig>,
61}
62
63impl<S: StateStore> IcebergListExecutor<S> {
64    #[allow(clippy::too_many_arguments)]
65    pub fn new(
66        actor_ctx: ActorContextRef,
67        stream_source_core: StreamSourceCore<S>,
68        downstream_columns: Option<Vec<ColumnCatalog>>,
69        metrics: Arc<StreamingMetrics>,
70        barrier_receiver: UnboundedReceiver<Barrier>,
71        system_params: SystemParamsReaderRef,
72        rate_limit_rps: Option<u32>,
73        streaming_config: Arc<StreamingConfig>,
74    ) -> Self {
75        Self {
76            actor_ctx,
77            stream_source_core,
78            downstream_columns,
79            metrics,
80            barrier_receiver: Some(barrier_receiver),
81            system_params,
82            rate_limit_rps,
83            streaming_config,
84        }
85    }
86
87    #[try_stream(ok = Message, error = StreamExecutorError)]
88    async fn into_stream(mut self) {
89        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
90        let first_barrier = barrier_receiver
91            .recv()
92            .instrument_await("source_recv_first_barrier")
93            .await
94            .ok_or_else(|| {
95                anyhow!(
96                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
97                    self.actor_ctx.id,
98                    self.stream_source_core.source_id
99                )
100            })?;
101        let first_epoch = first_barrier.epoch;
102
103        // Build source description from the builder.
104        let source_desc_builder: SourceDescBuilder =
105            self.stream_source_core.source_desc_builder.take().unwrap();
106
107        let properties = source_desc_builder.with_properties();
108        let config = ConnectorProperties::extract(properties, false)?;
109        let ConnectorProperties::Iceberg(iceberg_properties) = config else {
110            unreachable!()
111        };
112
113        // Get consistent column names for schema stability across snapshots
114        let downstream_columns = self.downstream_columns.map(|columns| {
115            columns
116                .iter()
117                .filter_map(|col| {
118                    if col.is_hidden() {
119                        None
120                    } else {
121                        Some(col.name().to_owned())
122                    }
123                })
124                .collect::<Vec<_>>()
125        });
126
127        tracing::debug!("downstream_columns: {:?}", downstream_columns);
128
129        yield Message::Barrier(first_barrier);
130        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
131
132        let state_table = self.stream_source_core.split_state_store.state_table_mut();
133        state_table.init_epoch(first_epoch).await?;
134        let state_row = state_table.get_from_one_value_table().await?;
135        // last_snapshot is EXCLUSIVE (i.e., already scanned)
136        let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
137        let mut prev_persisted_snapshot = last_snapshot;
138
139        if last_snapshot.is_none() {
140            // do a regular scan, then switch to incremental scan
141            // TODO: we may support starting from a specific snapshot/timestamp later
142            let table = iceberg_properties.load_table().await?;
143            // If current_snapshot is None (empty table), we go to incremental scan directly.
144            if let Some(start_snapshot) = table.metadata().current_snapshot() {
145                last_snapshot = Some(start_snapshot.snapshot_id());
146                let snapshot_scan_builder = table.scan().snapshot_id(start_snapshot.snapshot_id());
147
148                let snapshot_scan = if let Some(ref downstream_columns) = downstream_columns {
149                    snapshot_scan_builder.select(downstream_columns)
150                } else {
151                    // for backward compatibility
152                    snapshot_scan_builder.select_all()
153                }
154                .build()
155                .context("failed to build iceberg scan")?;
156
157                let mut chunk_builder = StreamChunkBuilder::new(
158                    self.streaming_config.developer.chunk_size,
159                    vec![DataType::Varchar, DataType::Jsonb],
160                );
161                #[for_await]
162                for scan_task in snapshot_scan
163                    .plan_files()
164                    .await
165                    .context("failed to plan iceberg files")?
166                {
167                    let scan_task = scan_task.context("failed to get scan task")?;
168                    if let Some(chunk) = chunk_builder.append_row(
169                        Op::Insert,
170                        &[
171                            Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
172                            Some(ScalarImpl::Jsonb(
173                                serde_json::to_value(scan_task).unwrap().into(),
174                            )),
175                        ],
176                    ) {
177                        yield Message::Chunk(chunk);
178                    }
179                }
180                if let Some(chunk) = chunk_builder.take() {
181                    yield Message::Chunk(chunk);
182                }
183            }
184        }
185
186        let last_snapshot = Arc::new(Mutex::new(last_snapshot));
187        let incremental_scan_stream = incremental_scan_stream(
188            *iceberg_properties,
189            last_snapshot.clone(),
190            self.streaming_config.developer.iceberg_list_interval_sec,
191            downstream_columns,
192        )
193        .map(|res| match res {
194            Ok(scan_task) => {
195                let row = (
196                    Op::Insert,
197                    OwnedRow::new(vec![
198                        Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
199                        Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
200                    ]),
201                );
202                Ok(StreamChunk::from_rows(
203                    &[row],
204                    &[DataType::Varchar, DataType::Jsonb],
205                ))
206            }
207            Err(e) => Err(e),
208        });
209
210        let mut stream =
211            StreamReaderWithPause::<true, _>::new(barrier_stream, incremental_scan_stream);
212
213        // TODO: support pause (incl. pause on startup)/resume/rate limit
214
215        while let Some(msg) = stream.next().await {
216            match msg {
217                Err(e) => {
218                    tracing::warn!(error = %e.as_report(), "encountered an error");
219                }
220                Ok(msg) => match msg {
221                    // Barrier arrives.
222                    Either::Left(msg) => match &msg {
223                        Message::Barrier(barrier) => {
224                            if let Some(mutation) = barrier.mutation.as_deref() {
225                                match mutation {
226                                    Mutation::Pause => stream.pause_stream(),
227                                    Mutation::Resume => stream.resume_stream(),
228                                    _ => (),
229                                }
230                            }
231                            if let Some(last_snapshot) = *last_snapshot.lock() {
232                                let state_row =
233                                    OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
234                                if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
235                                    let prev_state_row = OwnedRow::new(vec![
236                                        ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
237                                    ]);
238                                    state_table.update(prev_state_row, state_row);
239                                } else {
240                                    state_table.insert(state_row);
241                                }
242                                prev_persisted_snapshot = Some(last_snapshot);
243                            }
244                            state_table
245                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
246                                .await?;
247                            // Propagate the barrier.
248                            yield msg;
249                        }
250                        // Only barrier can be received.
251                        _ => unreachable!(),
252                    },
253                    // Data arrives.
254                    Either::Right(chunk) => {
255                        yield Message::Chunk(chunk);
256                    }
257                },
258            }
259        }
260    }
261}
262
263/// `last_snapshot` is EXCLUSIVE (i.e., already scanned)
264#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
265async fn incremental_scan_stream(
266    iceberg_properties: IcebergProperties,
267    last_snapshot_lock: Arc<Mutex<Option<i64>>>,
268    list_interval_sec: u64,
269    downstream_columns: Option<Vec<String>>,
270) {
271    let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
272    loop {
273        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
274
275        // XXX: should we use sth like table.refresh() instead of reload the table every time?
276        // iceberg-java does this, but iceberg-rust doesn't have this API now.
277        let table = iceberg_properties.load_table().await?;
278
279        let Some(current_snapshot) = table.metadata().current_snapshot() else {
280            tracing::info!("Skip incremental scan because table is empty");
281            continue;
282        };
283
284        if Some(current_snapshot.snapshot_id()) == last_snapshot {
285            tracing::info!(
286                "Current table snapshot is already enumerated: {}, no new snapshot available",
287                current_snapshot.snapshot_id()
288            );
289            continue;
290        }
291
292        let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
293        if let Some(last_snapshot) = last_snapshot {
294            incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
295        }
296        let incremental_scan = if let Some(ref downstream_columns) = downstream_columns {
297            incremental_scan.select(downstream_columns)
298        } else {
299            // for backward compatibility
300            incremental_scan.select_all()
301        }
302        .build()
303        .context("failed to build iceberg scan")?;
304
305        #[for_await]
306        for scan_task in incremental_scan
307            .plan_files()
308            .await
309            .context("failed to plan iceberg files")?
310        {
311            yield scan_task.context("failed to get scan task")?;
312        }
313
314        last_snapshot = Some(current_snapshot.snapshot_id());
315        *last_snapshot_lock.lock() = last_snapshot;
316    }
317}
318
319impl<S: StateStore> Execute for IcebergListExecutor<S> {
320    fn execute(self: Box<Self>) -> BoxedMessageStream {
321        self.into_stream().boxed()
322    }
323}
324
325impl<S: StateStore> Debug for IcebergListExecutor<S> {
326    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct("IcebergListExecutor")
328            .field("source_id", &self.stream_source_core.source_id)
329            .field("column_ids", &self.stream_source_core.column_ids)
330            .finish()
331    }
332}