risingwave_stream/executor/source/
fs_list_executor.rs1use anyhow::anyhow;
16use either::Either;
17use futures::TryStreamExt;
18use futures_async_stream::try_stream;
19use risingwave_common::array::Op;
20use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
21use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
22use thiserror_ext::AsReport;
23use tokio::sync::mpsc::UnboundedReceiver;
24
25use super::{StreamSourceCore, barrier_to_message_stream};
26use crate::executor::prelude::*;
27use crate::executor::stream_reader::StreamReaderWithPause;
28
29pub struct FsListExecutor<S: StateStore> {
30 actor_ctx: ActorContextRef,
31
32 stream_source_core: Option<StreamSourceCore<S>>,
34
35 #[expect(dead_code)]
37 metrics: Arc<StreamingMetrics>,
38
39 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
41
42 #[expect(dead_code)]
44 system_params: SystemParamsReaderRef,
45
46 #[expect(dead_code)]
48 rate_limit_rps: Option<u32>,
49}
50
51impl<S: StateStore> FsListExecutor<S> {
52 pub fn new(
53 actor_ctx: ActorContextRef,
54 stream_source_core: Option<StreamSourceCore<S>>,
55 metrics: Arc<StreamingMetrics>,
56 barrier_receiver: UnboundedReceiver<Barrier>,
57 system_params: SystemParamsReaderRef,
58 rate_limit_rps: Option<u32>,
59 ) -> Self {
60 Self {
61 actor_ctx,
62 stream_source_core,
63 metrics,
64 barrier_receiver: Some(barrier_receiver),
65 system_params,
66 rate_limit_rps,
67 }
68 }
69
70 fn build_chunked_paginate_stream(
71 &self,
72 source_desc: &SourceDesc,
73 ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<StreamChunk>> + use<S>> {
74 let stream = source_desc
75 .source
76 .get_source_list()
77 .map_err(StreamExecutorError::connector_error)?
78 .map_err(StreamExecutorError::connector_error);
79
80 let processed_stream = stream.map(|item| match item {
81 Ok(page_item) => {
82 let row = (
83 Op::Insert,
84 OwnedRow::new(vec![
85 Some(ScalarImpl::Utf8(page_item.name.into_boxed_str())),
86 Some(ScalarImpl::Timestamptz(page_item.timestamp)),
87 Some(ScalarImpl::Int64(page_item.size)),
88 ]),
89 );
90 Ok(StreamChunk::from_rows(
91 &[row],
92 &[DataType::Varchar, DataType::Timestamptz, DataType::Int64],
93 ))
94 }
95 Err(e) => {
96 tracing::error!(error = %e.as_report(), "Connector failed to list item");
97 Err(e)
98 }
99 });
100
101 Ok(processed_stream)
102 }
103
104 #[try_stream(ok = Message, error = StreamExecutorError)]
105 async fn into_stream(mut self) {
106 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
107 let barrier = barrier_receiver
108 .recv()
109 .instrument_await("source_recv_first_barrier")
110 .await
111 .ok_or_else(|| {
112 anyhow!(
113 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
114 self.actor_ctx.id,
115 self.stream_source_core.as_ref().unwrap().source_id
116 )
117 })?;
118
119 let mut core = self.stream_source_core.unwrap();
120
121 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
123 let source_desc = source_desc_builder
124 .build()
125 .map_err(StreamExecutorError::connector_error)?;
126
127 self.stream_source_core = Some(core);
129
130 let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc)?;
131
132 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
133 let mut stream =
134 StreamReaderWithPause::<true, _>::new(barrier_stream, chunked_paginate_stream);
135
136 if barrier.is_pause_on_startup() {
137 stream.pause_stream();
138 }
139
140 yield Message::Barrier(barrier);
141
142 while let Some(msg) = stream.next().await {
143 match msg {
144 Err(e) => {
145 tracing::warn!(error = %e.as_report(), "encountered an error, recovering");
146 stream.replace_data_stream(self.build_chunked_paginate_stream(&source_desc)?);
147 }
148 Ok(msg) => match msg {
149 Either::Left(msg) => match &msg {
151 Message::Barrier(barrier) => {
152 if let Some(mutation) = barrier.mutation.as_deref() {
153 match mutation {
154 Mutation::Pause => stream.pause_stream(),
155 Mutation::Resume => stream.resume_stream(),
156 _ => (),
157 }
158 }
159
160 yield msg;
162 }
163 _ => unreachable!(),
165 },
166 Either::Right(chunk) => {
168 yield Message::Chunk(chunk);
169 }
170 },
171 }
172 }
173 }
174}
175
176impl<S: StateStore> Execute for FsListExecutor<S> {
177 fn execute(self: Box<Self>) -> BoxedMessageStream {
178 self.into_stream().boxed()
179 }
180}
181
182impl<S: StateStore> Debug for FsListExecutor<S> {
183 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184 if let Some(core) = &self.stream_source_core {
185 f.debug_struct("FsListExecutor")
186 .field("source_id", &core.source_id)
187 .field("column_ids", &core.column_ids)
188 .finish()
189 } else {
190 f.debug_struct("FsListExecutor").finish()
191 }
192 }
193}