risingwave_stream/executor/source/
iceberg_list_executor.rs1use anyhow::{Context, anyhow};
16use either::Either;
17use futures_async_stream::try_stream;
18use iceberg::scan::FileScanTask;
19use parking_lot::Mutex;
20use risingwave_common::array::Op;
21use risingwave_common::catalog::ColumnCatalog;
22use risingwave_common::config::StreamingConfig;
23use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
24use risingwave_connector::source::ConnectorProperties;
25use risingwave_connector::source::iceberg::IcebergProperties;
26use risingwave_connector::source::reader::desc::SourceDescBuilder;
27use thiserror_ext::AsReport;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
31use crate::executor::prelude::*;
32use crate::executor::stream_reader::StreamReaderWithPause;
33
34pub struct IcebergListExecutor<S: StateStore> {
35 actor_ctx: ActorContextRef,
36
37 stream_source_core: StreamSourceCore<S>,
39
40 downstream_columns: Option<Vec<ColumnCatalog>>,
43
44 #[expect(dead_code)]
46 metrics: Arc<StreamingMetrics>,
47
48 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
50
51 #[expect(dead_code)]
53 system_params: SystemParamsReaderRef,
54
55 #[expect(dead_code)]
57 rate_limit_rps: Option<u32>,
58
59 streaming_config: Arc<StreamingConfig>,
61}
62
63impl<S: StateStore> IcebergListExecutor<S> {
64 #[allow(clippy::too_many_arguments)]
65 pub fn new(
66 actor_ctx: ActorContextRef,
67 stream_source_core: StreamSourceCore<S>,
68 downstream_columns: Option<Vec<ColumnCatalog>>,
69 metrics: Arc<StreamingMetrics>,
70 barrier_receiver: UnboundedReceiver<Barrier>,
71 system_params: SystemParamsReaderRef,
72 rate_limit_rps: Option<u32>,
73 streaming_config: Arc<StreamingConfig>,
74 ) -> Self {
75 Self {
76 actor_ctx,
77 stream_source_core,
78 downstream_columns,
79 metrics,
80 barrier_receiver: Some(barrier_receiver),
81 system_params,
82 rate_limit_rps,
83 streaming_config,
84 }
85 }
86
87 #[try_stream(ok = Message, error = StreamExecutorError)]
88 async fn into_stream(mut self) {
89 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
90 let first_barrier = barrier_receiver
91 .recv()
92 .instrument_await("source_recv_first_barrier")
93 .await
94 .ok_or_else(|| {
95 anyhow!(
96 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
97 self.actor_ctx.id,
98 self.stream_source_core.source_id
99 )
100 })?;
101 let first_epoch = first_barrier.epoch;
102
103 let source_desc_builder: SourceDescBuilder =
105 self.stream_source_core.source_desc_builder.take().unwrap();
106
107 let properties = source_desc_builder.with_properties();
108 let config = ConnectorProperties::extract(properties, false)?;
109 let ConnectorProperties::Iceberg(iceberg_properties) = config else {
110 unreachable!()
111 };
112
113 let downstream_columns = self.downstream_columns.map(|columns| {
115 columns
116 .iter()
117 .filter_map(|col| {
118 if col.is_hidden() {
119 None
120 } else {
121 Some(col.name().to_owned())
122 }
123 })
124 .collect::<Vec<_>>()
125 });
126
127 tracing::debug!("downstream_columns: {:?}", downstream_columns);
128
129 yield Message::Barrier(first_barrier);
130 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
131
132 let state_table = self.stream_source_core.split_state_store.state_table_mut();
133 state_table.init_epoch(first_epoch).await?;
134 let state_row = state_table.get_from_one_value_table().await?;
135 let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
137 let mut prev_persisted_snapshot = last_snapshot;
138
139 if last_snapshot.is_none() {
140 let table = iceberg_properties.load_table().await?;
143 if let Some(start_snapshot) = table.metadata().current_snapshot() {
145 last_snapshot = Some(start_snapshot.snapshot_id());
146 let snapshot_scan_builder = table.scan().snapshot_id(start_snapshot.snapshot_id());
147
148 let snapshot_scan = if let Some(ref downstream_columns) = downstream_columns {
149 snapshot_scan_builder.select(downstream_columns)
150 } else {
151 snapshot_scan_builder.select_all()
153 }
154 .build()
155 .context("failed to build iceberg scan")?;
156
157 let mut chunk_builder = StreamChunkBuilder::new(
158 self.streaming_config.developer.chunk_size,
159 vec![DataType::Varchar, DataType::Jsonb],
160 );
161 #[for_await]
162 for scan_task in snapshot_scan
163 .plan_files()
164 .await
165 .context("failed to plan iceberg files")?
166 {
167 let scan_task = scan_task.context("failed to get scan task")?;
168 if let Some(chunk) = chunk_builder.append_row(
169 Op::Insert,
170 &[
171 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
172 Some(ScalarImpl::Jsonb(
173 serde_json::to_value(scan_task).unwrap().into(),
174 )),
175 ],
176 ) {
177 yield Message::Chunk(chunk);
178 }
179 }
180 if let Some(chunk) = chunk_builder.take() {
181 yield Message::Chunk(chunk);
182 }
183 }
184 }
185
186 let last_snapshot = Arc::new(Mutex::new(last_snapshot));
187 let incremental_scan_stream = incremental_scan_stream(
188 *iceberg_properties,
189 last_snapshot.clone(),
190 self.streaming_config.developer.iceberg_list_interval_sec,
191 downstream_columns,
192 )
193 .map(|res| match res {
194 Ok(scan_task) => {
195 let row = (
196 Op::Insert,
197 OwnedRow::new(vec![
198 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
199 Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
200 ]),
201 );
202 Ok(StreamChunk::from_rows(
203 &[row],
204 &[DataType::Varchar, DataType::Jsonb],
205 ))
206 }
207 Err(e) => Err(e),
208 });
209
210 let mut stream =
211 StreamReaderWithPause::<true, _>::new(barrier_stream, incremental_scan_stream);
212
213 while let Some(msg) = stream.next().await {
216 match msg {
217 Err(e) => {
218 tracing::warn!(error = %e.as_report(), "encountered an error");
219 }
220 Ok(msg) => match msg {
221 Either::Left(msg) => match &msg {
223 Message::Barrier(barrier) => {
224 if let Some(mutation) = barrier.mutation.as_deref() {
225 match mutation {
226 Mutation::Pause => stream.pause_stream(),
227 Mutation::Resume => stream.resume_stream(),
228 _ => (),
229 }
230 }
231 if let Some(last_snapshot) = *last_snapshot.lock() {
232 let state_row =
233 OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
234 if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
235 let prev_state_row = OwnedRow::new(vec![
236 ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
237 ]);
238 state_table.update(prev_state_row, state_row);
239 } else {
240 state_table.insert(state_row);
241 }
242 prev_persisted_snapshot = Some(last_snapshot);
243 }
244 state_table
245 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
246 .await?;
247 yield msg;
249 }
250 _ => unreachable!(),
252 },
253 Either::Right(chunk) => {
255 yield Message::Chunk(chunk);
256 }
257 },
258 }
259 }
260 }
261}
262
263#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
265async fn incremental_scan_stream(
266 iceberg_properties: IcebergProperties,
267 last_snapshot_lock: Arc<Mutex<Option<i64>>>,
268 list_interval_sec: u64,
269 downstream_columns: Option<Vec<String>>,
270) {
271 let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
272 loop {
273 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
274
275 let table = iceberg_properties.load_table().await?;
278
279 let Some(current_snapshot) = table.metadata().current_snapshot() else {
280 tracing::info!("Skip incremental scan because table is empty");
281 continue;
282 };
283
284 if Some(current_snapshot.snapshot_id()) == last_snapshot {
285 tracing::info!(
286 "Current table snapshot is already enumerated: {}, no new snapshot available",
287 current_snapshot.snapshot_id()
288 );
289 continue;
290 }
291
292 let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
293 if let Some(last_snapshot) = last_snapshot {
294 incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
295 }
296 let incremental_scan = if let Some(ref downstream_columns) = downstream_columns {
297 incremental_scan.select(downstream_columns)
298 } else {
299 incremental_scan.select_all()
301 }
302 .build()
303 .context("failed to build iceberg scan")?;
304
305 #[for_await]
306 for scan_task in incremental_scan
307 .plan_files()
308 .await
309 .context("failed to plan iceberg files")?
310 {
311 yield scan_task.context("failed to get scan task")?;
312 }
313
314 last_snapshot = Some(current_snapshot.snapshot_id());
315 *last_snapshot_lock.lock() = last_snapshot;
316 }
317}
318
319impl<S: StateStore> Execute for IcebergListExecutor<S> {
320 fn execute(self: Box<Self>) -> BoxedMessageStream {
321 self.into_stream().boxed()
322 }
323}
324
325impl<S: StateStore> Debug for IcebergListExecutor<S> {
326 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct("IcebergListExecutor")
328 .field("source_id", &self.stream_source_core.source_id)
329 .field("column_ids", &self.stream_source_core.column_ids)
330 .finish()
331 }
332}