risingwave_stream/executor/source/
fs_list_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use either::Either;
17use futures::TryStreamExt;
18use futures_async_stream::try_stream;
19use risingwave_common::array::Op;
20use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
21use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
22use thiserror_ext::AsReport;
23use tokio::sync::mpsc::UnboundedReceiver;
24
25use super::{StreamSourceCore, barrier_to_message_stream};
26use crate::executor::prelude::*;
27use crate::executor::stream_reader::StreamReaderWithPause;
28
29pub struct FsListExecutor<S: StateStore> {
30    actor_ctx: ActorContextRef,
31
32    /// Streaming source for external
33    stream_source_core: Option<StreamSourceCore<S>>,
34
35    /// Metrics for monitor.
36    #[expect(dead_code)]
37    metrics: Arc<StreamingMetrics>,
38
39    /// Receiver of barrier channel.
40    barrier_receiver: Option<UnboundedReceiver<Barrier>>,
41
42    /// System parameter reader to read barrier interval
43    #[expect(dead_code)]
44    system_params: SystemParamsReaderRef,
45
46    /// Rate limit in rows/s.
47    #[expect(dead_code)]
48    rate_limit_rps: Option<u32>,
49}
50
51impl<S: StateStore> FsListExecutor<S> {
52    pub fn new(
53        actor_ctx: ActorContextRef,
54        stream_source_core: Option<StreamSourceCore<S>>,
55        metrics: Arc<StreamingMetrics>,
56        barrier_receiver: UnboundedReceiver<Barrier>,
57        system_params: SystemParamsReaderRef,
58        rate_limit_rps: Option<u32>,
59    ) -> Self {
60        Self {
61            actor_ctx,
62            stream_source_core,
63            metrics,
64            barrier_receiver: Some(barrier_receiver),
65            system_params,
66            rate_limit_rps,
67        }
68    }
69
70    fn build_chunked_paginate_stream(
71        &self,
72        source_desc: &SourceDesc,
73    ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<StreamChunk>> + use<S>> {
74        let stream = source_desc
75            .source
76            .get_source_list()
77            .map_err(StreamExecutorError::connector_error)?
78            .map_err(StreamExecutorError::connector_error);
79
80        let processed_stream = stream.map(|item| match item {
81            Ok(page_item) => {
82                let row = (
83                    Op::Insert,
84                    OwnedRow::new(vec![
85                        Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())),
86                        Some(ScalarImpl::Timestamptz(page_item.timestamp)),
87                        Some(ScalarImpl::Int64(page_item.size)),
88                    ]),
89                );
90                Ok(StreamChunk::from_rows(
91                    &[row],
92                    &[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
93                ))
94            }
95            Err(e) => {
96                tracing::error!(error = %e.as_report(), "Connector failed to list item");
97                Err(e)
98            }
99        });
100
101        Ok(processed_stream)
102    }
103
104    #[try_stream(ok = Message, error = StreamExecutorError)]
105    async fn into_stream(mut self) {
106        let mut barrier_receiver = self.barrier_receiver.take().unwrap();
107        let barrier = barrier_receiver
108            .recv()
109            .instrument_await("source_recv_first_barrier")
110            .await
111            .ok_or_else(|| {
112                anyhow!(
113                    "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
114                    self.actor_ctx.id,
115                    self.stream_source_core.as_ref().unwrap().source_id
116                )
117            })?;
118
119        let mut core = self.stream_source_core.unwrap();
120
121        // Build source description from the builder.
122        let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
123        let source_desc = source_desc_builder
124            .build()
125            .map_err(StreamExecutorError::connector_error)?;
126
127        // Return the ownership of `stream_source_core` to the source executor.
128        self.stream_source_core = Some(core);
129
130        let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc)?;
131
132        let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
133        let mut stream =
134            StreamReaderWithPause::<true, _>::new(barrier_stream, chunked_paginate_stream);
135
136        if barrier.is_pause_on_startup() {
137            stream.pause_stream();
138        }
139
140        yield Message::Barrier(barrier);
141
142        while let Some(msg) = stream.next().await {
143            match msg {
144                Err(e) => {
145                    tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
146                    stream.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?);
147                }
148                Ok(msg) => match msg {
149                    // Barrier arrives.
150                    Either::Left(msg) => match &msg {
151                        Message::Barrier(barrier) => {
152                            if let Some(mutation) = barrier.mutation.as_deref() {
153                                match mutation {
154                                    Mutation::Pause => stream.pause_stream(),
155                                    Mutation::Resume => stream.resume_stream(),
156                                    _ => (),
157                                }
158                            }
159
160                            // Propagate the barrier.
161                            yield msg;
162                        }
163                        // Only barrier can be received.
164                        _ => unreachable!(),
165                    },
166                    // Chunked FsPage arrives.
167                    Either::Right(chunk) => {
168                        yield Message::Chunk(chunk);
169                    }
170                },
171            }
172        }
173    }
174}
175
176impl<S: StateStore> Execute for FsListExecutor<S> {
177    fn execute(self: Box<Self>) -> BoxedMessageStream {
178        self.into_stream().boxed()
179    }
180}
181
182impl<S: StateStore> Debug for FsListExecutor<S> {
183    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184        if let Some(core) = &self.stream_source_core {
185            f.debug_struct("FsListExecutor")
186                .field("source_id", &core.source_id)
187                .field("column_ids", &core.column_ids)
188                .finish()
189        } else {
190            f.debug_struct("FsListExecutor").finish()
191        }
192    }
193}