risingwave_stream/executor/source/
iceberg_list_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use either::Either;
17use futures_async_stream::try_stream;
18use iceberg::scan::FileScanTask;
19use parking_lot::Mutex;
20use risingwave_common::array::Op;
21use risingwave_common::config::StreamingConfig;
22use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
23use risingwave_connector::source::ConnectorProperties;
24use risingwave_connector::source::iceberg::IcebergProperties;
25use risingwave_connector::source::reader::desc::SourceDescBuilder;
26use thiserror_ext::AsReport;
27use tokio::sync::mpsc::UnboundedReceiver;
28
29use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
30use crate::executor::prelude::*;
31use crate::executor::stream_reader::StreamReaderWithPause;
32
33pub struct IcebergListExecutor<S: StateStore> {
34    actor_ctx: ActorContextRef,
35
36    /// Streaming source for external
37    stream_source_core: StreamSourceCore<S>,
38
39    /// Metrics for monitor.
40    #[expect(dead_code)]
41    metrics: Arc<StreamingMetrics>,
42
43    /// Receiver of barrier channel.
44    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
45
46    /// System parameter reader to read barrier interval
47    #[expect(dead_code)]
48    system_params: SystemParamsReaderRef,
49
50    /// Rate limit in rows/s.
51    #[expect(dead_code)]
52    rate_limit_rps: Option<u32>,
53
54    /// Streaming config
55    streaming_config: Arc<StreamingConfig>,
56}
57
58impl<S: StateStore> IcebergListExecutor<S> {
59    pub fn new(
60        actor_ctx: ActorContextRef,
61        stream_source_core: StreamSourceCore<S>,
62        metrics: Arc<StreamingMetrics>,
63        barrier_receiver: UnboundedReceiver<Barrier>,
64        system_params: SystemParamsReaderRef,
65        rate_limit_rps: Option<u32>,
66        streaming_config: Arc<StreamingConfig>,
67    ) -> Self {
68        Self {
69            actor_ctx,
70            stream_source_core,
71            metrics,
72            barrier_receiver: Some(barrier_receiver),
73            system_params,
74            rate_limit_rps,
75            streaming_config,
76        }
77    }
78
79    #[try_stream(ok = Message, error = StreamExecutorError)]
80    async fn into_stream(mut self) {
81        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
82        let first_barrier = barrier_receiver
83            .recv()
84            .instrument_await("source_recv_first_barrier")
85            .await
86            .ok_or_else(|| {
87                anyhow!(
88                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
89                    self.actor_ctx.id,
90                    self.stream_source_core.source_id
91                )
92            })?;
93        let first_epoch = first_barrier.epoch;
94
95        // Build source description from the builder.
96        let source_desc_builder: SourceDescBuilder =
97            self.stream_source_core.source_desc_builder.take().unwrap();
98
99        let properties = source_desc_builder.with_properties();
100        let config = ConnectorProperties::extract(properties, false)?;
101        let ConnectorProperties::Iceberg(iceberg_properties) = config else {
102            unreachable!()
103        };
104
105        yield Message::Barrier(first_barrier);
106        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
107
108        let state_table = self.stream_source_core.split_state_store.state_table_mut();
109        state_table.init_epoch(first_epoch).await?;
110        let state_row = state_table.get_from_one_value_table().await?;
111        // last_snapshot is EXCLUSIVE (i.e., already scanned)
112        let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
113        let mut prev_persisted_snapshot = last_snapshot;
114
115        if last_snapshot.is_none() {
116            // do a regular scan, then switch to incremental scan
117            // TODO: we may support starting from a specific snapshot/timestamp later
118            let table = iceberg_properties.load_table().await?;
119            // If current_snapshot is None (empty table), we go to incremental scan directly.
120            if let Some(start_snapshot) = table.metadata().current_snapshot() {
121                last_snapshot = Some(start_snapshot.snapshot_id());
122                let snapshot_scan = table
123                    .scan()
124                    .snapshot_id(start_snapshot.snapshot_id())
125                    // TODO: col prune
126                    // NOTE: select by name might be not robust under schema evolution
127                    .select_all()
128                    .build()
129                    .context("failed to build iceberg scan")?;
130
131                let mut chunk_builder = StreamChunkBuilder::new(
132                    self.streaming_config.developer.chunk_size,
133                    vec![DataType::Varchar, DataType::Jsonb],
134                );
135                #[for_await]
136                for scan_task in snapshot_scan
137                    .plan_files()
138                    .await
139                    .context("failed to plan iceberg files")?
140                {
141                    let scan_task = scan_task.context("failed to get scan task")?;
142                    if let Some(chunk) = chunk_builder.append_row(
143                        Op::Insert,
144                        &[
145                            Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
146                            Some(ScalarImpl::Jsonb(
147                                serde_json::to_value(scan_task).unwrap().into(),
148                            )),
149                        ],
150                    ) {
151                        yield Message::Chunk(chunk);
152                    }
153                }
154                if let Some(chunk) = chunk_builder.take() {
155                    yield Message::Chunk(chunk);
156                }
157            }
158        }
159
160        let last_snapshot = Arc::new(Mutex::new(last_snapshot));
161        let incremental_scan_stream = incremental_scan_stream(
162            *iceberg_properties,
163            last_snapshot.clone(),
164            self.streaming_config.developer.iceberg_list_interval_sec,
165        )
166        .map(|res| match res {
167            Ok(scan_task) => {
168                let row = (
169                    Op::Insert,
170                    OwnedRow::new(vec![
171                        Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
172                        Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
173                    ]),
174                );
175                Ok(StreamChunk::from_rows(
176                    &[row],
177                    &[DataType::Varchar, DataType::Jsonb],
178                ))
179            }
180            Err(e) => Err(e),
181        });
182
183        let mut stream =
184            StreamReaderWithPause::<true, _>::new(barrier_stream, incremental_scan_stream);
185
186        // TODO: support pause (incl. pause on startup)/resume/rate limit
187
188        while let Some(msg) = stream.next().await {
189            match msg {
190                Err(e) => {
191                    tracing::warn!(error = %e.as_report(), "encountered an error");
192                }
193                Ok(msg) => match msg {
194                    // Barrier arrives.
195                    Either::Left(msg) => match &msg {
196                        Message::Barrier(barrier) => {
197                            if let Some(mutation) = barrier.mutation.as_deref() {
198                                match mutation {
199                                    Mutation::Pause => stream.pause_stream(),
200                                    Mutation::Resume => stream.resume_stream(),
201                                    _ => (),
202                                }
203                            }
204                            if let Some(last_snapshot) = *last_snapshot.lock() {
205                                let state_row =
206                                    OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
207                                if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
208                                    let prev_state_row = OwnedRow::new(vec![
209                                        ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
210                                    ]);
211                                    state_table.update(prev_state_row, state_row);
212                                } else {
213                                    state_table.insert(state_row);
214                                }
215                                prev_persisted_snapshot = Some(last_snapshot);
216                            }
217                            state_table
218                                .commit_assert_no_update_vnode_bitmap(barrier.epoch)
219                                .await?;
220                            // Propagate the barrier.
221                            yield msg;
222                        }
223                        // Only barrier can be received.
224                        _ => unreachable!(),
225                    },
226                    // Data arrives.
227                    Either::Right(chunk) => {
228                        yield Message::Chunk(chunk);
229                    }
230                },
231            }
232        }
233    }
234}
235
236/// `last_snapshot` is EXCLUSIVE (i.e., already scanned)
237#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
238async fn incremental_scan_stream(
239    iceberg_properties: IcebergProperties,
240    last_snapshot_lock: Arc<Mutex<Option<i64>>>,
241    list_interval_sec: u64,
242) {
243    let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
244    loop {
245        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
246
247        // XXX: should we use sth like table.refresh() instead of reload the table every time?
248        // iceberg-java does this, but iceberg-rust doesn't have this API now.
249        let table = iceberg_properties.load_table().await?;
250
251        let Some(current_snapshot) = table.metadata().current_snapshot() else {
252            tracing::info!("Skip incremental scan because table is empty");
253            continue;
254        };
255
256        if Some(current_snapshot.snapshot_id()) == last_snapshot {
257            tracing::info!(
258                "Current table snapshot is already enumerated: {}, no new snapshot available",
259                current_snapshot.snapshot_id()
260            );
261            continue;
262        }
263
264        let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
265        if let Some(last_snapshot) = last_snapshot {
266            incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
267        }
268        let incremental_scan = incremental_scan
269            // TODO: col prune
270            // NOTE: select by name might be not robust under schema evolution
271            .select_all()
272            .build()
273            .context("failed to build iceberg scan")?;
274
275        #[for_await]
276        for scan_task in incremental_scan
277            .plan_files()
278            .await
279            .context("failed to plan iceberg files")?
280        {
281            yield scan_task.context("failed to get scan task")?;
282        }
283
284        last_snapshot = Some(current_snapshot.snapshot_id());
285        *last_snapshot_lock.lock() = last_snapshot;
286    }
287}
288
289impl<S: StateStore> Execute for IcebergListExecutor<S> {
290    fn execute(self: Box<Self>) -> BoxedMessageStream {
291        self.into_stream().boxed()
292    }
293}
294
295impl<S: StateStore> Debug for IcebergListExecutor<S> {
296    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("IcebergListExecutor")
298            .field("source_id", &self.stream_source_core.source_id)
299            .field("column_ids", &self.stream_source_core.column_ids)
300            .finish()
301    }
302}