risingwave_stream/executor/source/batch_source/
batch_iceberg_fetch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use either::Either;
18use futures::stream;
19use iceberg::scan::FileScanTask;
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::array::Op;
23use risingwave_common::catalog::{ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME};
24use risingwave_common::config::StreamingConfig;
25use risingwave_common::id::TableId;
26use risingwave_common::types::{JsonbVal, Scalar, ScalarRef};
27use risingwave_connector::source::iceberg::{IcebergScanOpts, scan_task_to_chunk_with_deletes};
28use risingwave_connector::source::reader::desc::SourceDesc;
29use thiserror_ext::AsReport;
30
31use crate::executor::prelude::*;
32use crate::executor::source::{
33    ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
34};
35use crate::executor::stream_reader::StreamReaderWithPause;
36use crate::task::LocalBarrierManager;
37
38pub struct BatchIcebergFetchExecutor<S: StateStore> {
39    actor_ctx: ActorContextRef,
40
41    /// Core component for managing external streaming source state
42    stream_source_core: Option<StreamSourceCore<S>>,
43
44    /// Upstream list executor that provides the list of files to read.
45    /// This executor is responsible for discovering new files and changes in the Iceberg table.
46    upstream: Option<Executor>,
47
48    // barrier manager for reporting load finished
49    barrier_manager: LocalBarrierManager,
50
51    streaming_config: Arc<StreamingConfig>,
52
53    associated_table_id: TableId,
54}
55
56impl<S: StateStore> BatchIcebergFetchExecutor<S> {
57    pub fn new(
58        actor_ctx: ActorContextRef,
59        stream_source_core: StreamSourceCore<S>,
60        upstream: Executor,
61        barrier_manager: LocalBarrierManager,
62        streaming_config: Arc<StreamingConfig>,
63        associated_table_id: Option<TableId>,
64    ) -> Self {
65        assert!(associated_table_id.is_some());
66        Self {
67            actor_ctx,
68            stream_source_core: Some(stream_source_core),
69            upstream: Some(upstream),
70            barrier_manager,
71            streaming_config,
72            associated_table_id: associated_table_id.unwrap(),
73        }
74    }
75}
76
77impl<S: StateStore> BatchIcebergFetchExecutor<S> {
78    #[try_stream(ok = Message, error = StreamExecutorError)]
79    async fn into_stream(mut self) {
80        let mut upstream = self.upstream.take().unwrap().execute();
81        let barrier = expect_first_barrier(&mut upstream).await?;
82        yield Message::Barrier(barrier);
83
84        let mut is_refreshing = false;
85        let mut is_list_finished = false;
86        let mut splits_on_fetch: usize = 0;
87        let is_load_finished = Arc::new(RwLock::new(false));
88        let mut file_queue = VecDeque::new();
89
90        let mut core = self.stream_source_core.take().unwrap();
91        let source_desc_builder = core.source_desc_builder.take().unwrap();
92        let source_desc = source_desc_builder
93            .build()
94            .map_err(StreamExecutorError::connector_error)?;
95
96        let file_path_idx = source_desc
97            .columns
98            .iter()
99            .position(|c| c.name == ICEBERG_FILE_PATH_COLUMN_NAME)
100            .unwrap();
101        let file_pos_idx = source_desc
102            .columns
103            .iter()
104            .position(|c| c.name == ICEBERG_FILE_POS_COLUMN_NAME)
105            .unwrap();
106
107        let mut stream = StreamReaderWithPause::<true, ChunksWithState>::new(
108            upstream,
109            stream::pending().boxed(),
110        );
111
112        while let Some(msg) = stream.next().await {
113            match msg {
114                Err(e) => {
115                    tracing::error!(error = %e.as_report(), "Fetch Error");
116                    file_queue.clear();
117                    *is_load_finished.write() = false;
118                    return Err(e);
119                }
120                Ok(msg) => {
121                    match msg {
122                        Either::Left(msg) => {
123                            match msg {
124                                Message::Barrier(barrier) => {
125                                    let mut need_rebuild_reader = false;
126                                    if let Some(mutation) = barrier.mutation.as_deref() {
127                                        match mutation {
128                                            Mutation::Pause => stream.pause_stream(),
129                                            Mutation::Resume => stream.resume_stream(),
130                                            Mutation::RefreshStart {
131                                                associated_source_id,
132                                                ..
133                                            } if associated_source_id == &core.source_id => {
134                                                tracing::info!(
135                                                    ?barrier.epoch,
136                                                    actor_id = %self.actor_ctx.id,
137                                                    source_id = %core.source_id,
138                                                    table_id = %self.associated_table_id,
139                                                    "RefreshStart:"
140                                                );
141
142                                                // reset states and abort current workload
143                                                file_queue.clear();
144                                                splits_on_fetch = 0;
145                                                is_refreshing = true;
146                                                is_list_finished = false;
147                                                *is_load_finished.write() = false;
148
149                                                need_rebuild_reader = true;
150                                            }
151                                            Mutation::ListFinish {
152                                                associated_source_id,
153                                            } if associated_source_id == &core.source_id => {
154                                                tracing::info!(
155                                                    ?barrier.epoch,
156                                                    actor_id = %self.actor_ctx.id,
157                                                    source_id = %core.source_id,
158                                                    table_id = %self.associated_table_id,
159                                                    "ListFinish:"
160                                                );
161                                                is_list_finished = true;
162                                            }
163                                            _ => {
164                                                // ignore other mutations
165                                            }
166                                        }
167                                    }
168
169                                    if splits_on_fetch == 0
170                                        && file_queue.is_empty()
171                                        && is_list_finished
172                                        && is_refreshing
173                                        && barrier.is_checkpoint()
174                                    {
175                                        tracing::info!(
176                                            ?barrier.epoch,
177                                            actor_id = %self.actor_ctx.id,
178                                            source_id = %core.source_id,
179                                            table_id = %self.associated_table_id,
180                                            "Reporting load finished"
181                                        );
182                                        self.barrier_manager.report_source_load_finished(
183                                            barrier.epoch,
184                                            self.actor_ctx.id,
185                                            self.associated_table_id,
186                                            core.source_id,
187                                        );
188
189                                        // reset flags
190                                        is_list_finished = false;
191                                        is_refreshing = false;
192                                    }
193
194                                    yield Message::Barrier(barrier);
195
196                                    if need_rebuild_reader
197                                        || (splits_on_fetch == 0
198                                            && !file_queue.is_empty()
199                                            && is_refreshing)
200                                    {
201                                        Self::replace_with_new_batch_reader(
202                                            &mut file_queue,
203                                            &mut stream,
204                                            self.streaming_config.clone(),
205                                            &mut splits_on_fetch,
206                                            source_desc.clone(),
207                                            is_load_finished.clone(),
208                                        )?;
209                                    }
210                                }
211                                Message::Chunk(chunk) => {
212                                    let jsonb_values: Vec<(String, JsonbVal)> = chunk
213                                        .data_chunk()
214                                        .rows()
215                                        .map(|row| {
216                                            let file_name = row.datum_at(0).unwrap().into_utf8();
217                                            let split = row.datum_at(1).unwrap().into_jsonb();
218                                            (file_name.to_owned(), split.to_owned_scalar())
219                                        })
220                                        .collect();
221                                    tracing::debug!(
222                                        "received file assignments: {:?}",
223                                        jsonb_values
224                                    );
225                                    file_queue.extend(jsonb_values);
226                                }
227                                Message::Watermark(_) => unreachable!(),
228                            }
229                        }
230                        Either::Right(ChunksWithState { chunks, .. }) => {
231                            splits_on_fetch -= 1;
232
233                            for chunk in &chunks {
234                                let chunk = prune_additional_cols(
235                                    chunk,
236                                    &[file_path_idx, file_pos_idx],
237                                    &source_desc.columns,
238                                );
239
240                                yield Message::Chunk(chunk);
241                            }
242                        }
243                    }
244                }
245            }
246        }
247    }
248
249    fn replace_with_new_batch_reader<const BIASED: bool>(
250        file_queue: &mut VecDeque<(String, JsonbVal)>,
251        stream: &mut StreamReaderWithPause<BIASED, ChunksWithState>,
252        streaming_config: Arc<StreamingConfig>,
253        splits_on_fetch: &mut usize,
254        source_desc: SourceDesc,
255        read_finished: Arc<RwLock<bool>>,
256    ) -> StreamExecutorResult<()> {
257        let mut batch =
258            Vec::with_capacity(streaming_config.developer.iceberg_fetch_batch_size as usize);
259        for _ in 0..streaming_config.developer.iceberg_fetch_batch_size {
260            if let Some((_, split_json)) = file_queue.pop_front() {
261                batch.push(PersistedFileScanTask::decode(split_json.as_scalar_ref())?);
262            } else {
263                break;
264            }
265        }
266
267        if batch.is_empty() {
268            stream.replace_data_stream(stream::pending().boxed());
269        } else {
270            tracing::debug!("building batch reader with {} files", batch.len());
271            *splits_on_fetch += batch.len();
272            *read_finished.write() = false;
273            let batch_reader = Self::build_batched_stream_reader(
274                source_desc,
275                batch,
276                streaming_config,
277                read_finished,
278            );
279            stream.replace_data_stream(batch_reader.boxed());
280        }
281
282        Ok(())
283    }
284
285    #[try_stream(ok = ChunksWithState, error = StreamExecutorError)]
286    async fn build_batched_stream_reader(
287        source_desc: SourceDesc,
288        read_batch: Vec<FileScanTask>,
289        streaming_config: Arc<StreamingConfig>,
290        read_finished: Arc<RwLock<bool>>,
291    ) {
292        let properties = source_desc.source.config.clone();
293        let properties = match properties {
294            risingwave_connector::source::ConnectorProperties::Iceberg(iceberg_properties) => {
295                iceberg_properties
296            }
297            _ => unreachable!(),
298        };
299        let table = properties.load_table().await?;
300
301        for task in read_batch {
302            let mut chunks = vec![];
303            #[for_await]
304            for chunk in scan_task_to_chunk_with_deletes(
305                table.clone(),
306                task,
307                IcebergScanOpts {
308                    chunk_size: streaming_config.developer.chunk_size,
309                    need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
310                    need_file_path_and_pos: true,
311                    handle_delete_files: true, // Enable delete file handling for streaming source
312                },
313                None,
314            ) {
315                let chunk = chunk?;
316
317                chunks.push(StreamChunk::from_parts(
318                    itertools::repeat_n(Op::Insert, chunk.capacity()).collect_vec(),
319                    chunk,
320                ));
321            }
322            yield ChunksWithState {
323                chunks,
324                data_file_path: "".to_owned(), /* we do not need data file path for refreshable iceberg fetch, as no state persisted */
325                last_read_pos: None,
326            };
327        }
328
329        *read_finished.write() = true;
330    }
331}
332
333impl<S: StateStore> Execute for BatchIcebergFetchExecutor<S> {
334    fn execute(self: Box<Self>) -> BoxedMessageStream {
335        self.into_stream().boxed()
336    }
337}
338
339impl<S: StateStore> Debug for BatchIcebergFetchExecutor<S> {
340    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
341        if let Some(core) = &self.stream_source_core {
342            f.debug_struct("BatchIcebergFetchExecutor")
343                .field("source_id", &core.source_id)
344                .field("column_ids", &core.column_ids)
345                .finish()
346        } else {
347            f.debug_struct("BatchIcebergFetchExecutor").finish()
348        }
349    }
350}