risingwave_stream/executor/source/
iceberg_list_executor.rs1use anyhow::{Context, anyhow};
16use either::Either;
17use futures_async_stream::try_stream;
18use iceberg::scan::FileScanTask;
19use parking_lot::Mutex;
20use risingwave_common::array::Op;
21use risingwave_common::config::StreamingConfig;
22use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
23use risingwave_connector::source::ConnectorProperties;
24use risingwave_connector::source::iceberg::IcebergProperties;
25use risingwave_connector::source::reader::desc::SourceDescBuilder;
26use thiserror_ext::AsReport;
27use tokio::sync::mpsc::UnboundedReceiver;
28
29use super::{PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream};
30use crate::executor::prelude::*;
31use crate::executor::stream_reader::StreamReaderWithPause;
32
33pub struct IcebergListExecutor<S: StateStore> {
34 actor_ctx: ActorContextRef,
35
36 stream_source_core: StreamSourceCore<S>,
38
39 #[expect(dead_code)]
41 metrics: Arc<StreamingMetrics>,
42
43 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
45
46 #[expect(dead_code)]
48 system_params: SystemParamsReaderRef,
49
50 #[expect(dead_code)]
52 rate_limit_rps: Option<u32>,
53
54 streaming_config: Arc<StreamingConfig>,
56}
57
58impl<S: StateStore> IcebergListExecutor<S> {
59 pub fn new(
60 actor_ctx: ActorContextRef,
61 stream_source_core: StreamSourceCore<S>,
62 metrics: Arc<StreamingMetrics>,
63 barrier_receiver: UnboundedReceiver<Barrier>,
64 system_params: SystemParamsReaderRef,
65 rate_limit_rps: Option<u32>,
66 streaming_config: Arc<StreamingConfig>,
67 ) -> Self {
68 Self {
69 actor_ctx,
70 stream_source_core,
71 metrics,
72 barrier_receiver: Some(barrier_receiver),
73 system_params,
74 rate_limit_rps,
75 streaming_config,
76 }
77 }
78
79 #[try_stream(ok = Message, error = StreamExecutorError)]
80 async fn into_stream(mut self) {
81 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
82 let first_barrier = barrier_receiver
83 .recv()
84 .instrument_await("source_recv_first_barrier")
85 .await
86 .ok_or_else(|| {
87 anyhow!(
88 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
89 self.actor_ctx.id,
90 self.stream_source_core.source_id
91 )
92 })?;
93 let first_epoch = first_barrier.epoch;
94
95 let source_desc_builder: SourceDescBuilder =
97 self.stream_source_core.source_desc_builder.take().unwrap();
98
99 let properties = source_desc_builder.with_properties();
100 let config = ConnectorProperties::extract(properties, false)?;
101 let ConnectorProperties::Iceberg(iceberg_properties) = config else {
102 unreachable!()
103 };
104
105 yield Message::Barrier(first_barrier);
106 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
107
108 let state_table = self.stream_source_core.split_state_store.state_table_mut();
109 state_table.init_epoch(first_epoch).await?;
110 let state_row = state_table.get_from_one_value_table().await?;
111 let mut last_snapshot: Option<i64> = state_row.map(|s| *s.as_int64());
113 let mut prev_persisted_snapshot = last_snapshot;
114
115 if last_snapshot.is_none() {
116 let table = iceberg_properties.load_table().await?;
119 if let Some(start_snapshot) = table.metadata().current_snapshot() {
121 last_snapshot = Some(start_snapshot.snapshot_id());
122 let snapshot_scan = table
123 .scan()
124 .snapshot_id(start_snapshot.snapshot_id())
125 .select_all()
128 .build()
129 .context("failed to build iceberg scan")?;
130
131 let mut chunk_builder = StreamChunkBuilder::new(
132 self.streaming_config.developer.chunk_size,
133 vec![DataType::Varchar, DataType::Jsonb],
134 );
135 #[for_await]
136 for scan_task in snapshot_scan
137 .plan_files()
138 .await
139 .context("failed to plan iceberg files")?
140 {
141 let scan_task = scan_task.context("failed to get scan task")?;
142 if let Some(chunk) = chunk_builder.append_row(
143 Op::Insert,
144 &[
145 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
146 Some(ScalarImpl::Jsonb(
147 serde_json::to_value(scan_task).unwrap().into(),
148 )),
149 ],
150 ) {
151 yield Message::Chunk(chunk);
152 }
153 }
154 if let Some(chunk) = chunk_builder.take() {
155 yield Message::Chunk(chunk);
156 }
157 }
158 }
159
160 let last_snapshot = Arc::new(Mutex::new(last_snapshot));
161 let incremental_scan_stream = incremental_scan_stream(
162 *iceberg_properties,
163 last_snapshot.clone(),
164 self.streaming_config.developer.iceberg_list_interval_sec,
165 )
166 .map(|res| match res {
167 Ok(scan_task) => {
168 let row = (
169 Op::Insert,
170 OwnedRow::new(vec![
171 Some(ScalarImpl::Utf8(scan_task.data_file_path().into())),
172 Some(ScalarImpl::Jsonb(PersistedFileScanTask::encode(scan_task))),
173 ]),
174 );
175 Ok(StreamChunk::from_rows(
176 &[row],
177 &[DataType::Varchar, DataType::Jsonb],
178 ))
179 }
180 Err(e) => Err(e),
181 });
182
183 let mut stream =
184 StreamReaderWithPause::<true, _>::new(barrier_stream, incremental_scan_stream);
185
186 while let Some(msg) = stream.next().await {
189 match msg {
190 Err(e) => {
191 tracing::warn!(error = %e.as_report(), "encountered an error");
192 }
193 Ok(msg) => match msg {
194 Either::Left(msg) => match &msg {
196 Message::Barrier(barrier) => {
197 if let Some(mutation) = barrier.mutation.as_deref() {
198 match mutation {
199 Mutation::Pause => stream.pause_stream(),
200 Mutation::Resume => stream.resume_stream(),
201 _ => (),
202 }
203 }
204 if let Some(last_snapshot) = *last_snapshot.lock() {
205 let state_row =
206 OwnedRow::new(vec![ScalarImpl::Int64(last_snapshot).into()]);
207 if let Some(prev_persisted_snapshot_id) = prev_persisted_snapshot {
208 let prev_state_row = OwnedRow::new(vec![
209 ScalarImpl::Int64(prev_persisted_snapshot_id).into(),
210 ]);
211 state_table.update(prev_state_row, state_row);
212 } else {
213 state_table.insert(state_row);
214 }
215 prev_persisted_snapshot = Some(last_snapshot);
216 }
217 state_table
218 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
219 .await?;
220 yield msg;
222 }
223 _ => unreachable!(),
225 },
226 Either::Right(chunk) => {
228 yield Message::Chunk(chunk);
229 }
230 },
231 }
232 }
233 }
234}
235
236#[try_stream(boxed, ok = FileScanTask, error = StreamExecutorError)]
238async fn incremental_scan_stream(
239 iceberg_properties: IcebergProperties,
240 last_snapshot_lock: Arc<Mutex<Option<i64>>>,
241 list_interval_sec: u64,
242) {
243 let mut last_snapshot: Option<i64> = *last_snapshot_lock.lock();
244 loop {
245 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
246
247 let table = iceberg_properties.load_table().await?;
250
251 let Some(current_snapshot) = table.metadata().current_snapshot() else {
252 tracing::info!("Skip incremental scan because table is empty");
253 continue;
254 };
255
256 if Some(current_snapshot.snapshot_id()) == last_snapshot {
257 tracing::info!(
258 "Current table snapshot is already enumerated: {}, no new snapshot available",
259 current_snapshot.snapshot_id()
260 );
261 continue;
262 }
263
264 let mut incremental_scan = table.scan().to_snapshot_id(current_snapshot.snapshot_id());
265 if let Some(last_snapshot) = last_snapshot {
266 incremental_scan = incremental_scan.from_snapshot_id(last_snapshot);
267 }
268 let incremental_scan = incremental_scan
269 .select_all()
272 .build()
273 .context("failed to build iceberg scan")?;
274
275 #[for_await]
276 for scan_task in incremental_scan
277 .plan_files()
278 .await
279 .context("failed to plan iceberg files")?
280 {
281 yield scan_task.context("failed to get scan task")?;
282 }
283
284 last_snapshot = Some(current_snapshot.snapshot_id());
285 *last_snapshot_lock.lock() = last_snapshot;
286 }
287}
288
289impl<S: StateStore> Execute for IcebergListExecutor<S> {
290 fn execute(self: Box<Self>) -> BoxedMessageStream {
291 self.into_stream().boxed()
292 }
293}
294
295impl<S: StateStore> Debug for IcebergListExecutor<S> {
296 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("IcebergListExecutor")
298 .field("source_id", &self.stream_source_core.source_id)
299 .field("column_ids", &self.stream_source_core.column_ids)
300 .finish()
301 }
302}