risingwave_stream/executor/source/
mod.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26 BoxSourceChunkStream, BoxSourceReaderEventStream, BoxStreamingFileSourceChunkStream,
27 SourceColumnDesc, SourceReaderEvent, SplitId,
28};
29use risingwave_pb::plan_common::AdditionalColumn;
30use risingwave_pb::plan_common::additional_column::ColumnType;
31pub use state_table_handler::*;
32
33mod executor_core;
34pub use executor_core::StreamSourceCore;
35
36mod reader_stream;
37
38mod source_executor;
39pub use source_executor::*;
40mod dummy_source_executor;
41pub use dummy_source_executor::*;
42mod source_backfill_executor;
43pub use source_backfill_executor::*;
44mod fs_list_executor;
45pub use fs_list_executor::*;
46mod fs_fetch_executor;
47pub use fs_fetch_executor::*;
48mod iceberg_list_executor;
49pub use iceberg_list_executor::*;
50mod iceberg_fetch_executor;
51pub use iceberg_fetch_executor::*;
52mod batch_source; pub use batch_source::*;
54mod source_backfill_state_table;
55pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
56
57pub mod state_table_handler;
58use futures_async_stream::try_stream;
59use tokio::sync::mpsc::UnboundedReceiver;
60use tokio_retry::strategy::{ExponentialBackoff, jitter};
61
62use crate::executor::error::StreamExecutorError;
63use crate::executor::{Barrier, Message};
64
65#[try_stream(ok = Message, error = StreamExecutorError)]
67pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
68 while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
69 yield Message::Barrier(barrier);
70 }
71 bail!("barrier reader closed unexpectedly");
72}
73
74pub fn get_split_offset_mapping_from_chunk(
75 chunk: &StreamChunk,
76 split_idx: usize,
77 offset_idx: usize,
78) -> Option<HashMap<SplitId, String>> {
79 let mut split_offset_mapping = HashMap::new();
80 for i in 0..chunk.capacity() {
82 let (_, row, _) = chunk.row_at(i);
83 let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
84 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
85 split_offset_mapping.insert(split_id, offset.to_owned());
86 }
87 Some(split_offset_mapping)
88}
89
90pub fn get_split_offset_col_idx(
92 column_descs: &[SourceColumnDesc],
93) -> (Option<usize>, Option<usize>, Option<usize>) {
94 let mut split_idx = None;
95 let mut offset_idx = None;
96 let mut pulsar_message_id_idx = None;
97 for (idx, column) in column_descs.iter().enumerate() {
98 match column.additional_column {
99 AdditionalColumn {
100 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
101 } => {
102 split_idx = Some(idx);
103 }
104 AdditionalColumn {
105 column_type: Some(ColumnType::Offset(_)),
106 } => {
107 offset_idx = Some(idx);
108 }
109 AdditionalColumn {
110 column_type: Some(ColumnType::PulsarMessageIdData(_)),
111 } => {
112 pulsar_message_id_idx = Some(idx);
113 }
114 _ => (),
115 }
116 }
117 (split_idx, offset_idx, pulsar_message_id_idx)
118}
119
120pub fn prune_additional_cols(
121 chunk: &StreamChunk,
122 to_prune_indices: &[usize],
123 column_descs: &[SourceColumnDesc],
124) -> StreamChunk {
125 chunk.project(
126 &(0..chunk.dimension())
127 .filter(|&idx| !to_prune_indices.contains(&idx) || column_descs[idx].is_visible())
128 .collect_vec(),
129 )
130}
131
132#[try_stream(ok = StreamChunk, error = ConnectorError)]
133pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
134 if rate_limit_rps == Some(0) {
135 let future = futures::future::pending::<()>();
137 future.await;
138 unreachable!();
139 }
140
141 let limiter = RateLimiter::new(
142 rate_limit_rps
143 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
144 .into(),
145 );
146
147 #[for_await]
148 for chunk in stream {
149 let chunk = chunk?;
150 yield process_chunk(chunk, rate_limit_rps, &limiter).await;
151 }
152}
153
154#[try_stream(ok = SourceReaderEvent, error = ConnectorError)]
155pub async fn apply_rate_limit_to_source_reader_event(
156 stream: BoxSourceReaderEventStream,
157 rate_limit_rps: Option<u32>,
158) {
159 if rate_limit_rps == Some(0) {
160 let future = futures::future::pending::<()>();
162 future.await;
163 unreachable!();
164 }
165
166 let limiter = RateLimiter::new(
167 rate_limit_rps
168 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
169 .into(),
170 );
171
172 #[for_await]
173 for event in stream {
174 match event? {
175 SourceReaderEvent::DataChunk(chunk) => {
176 yield SourceReaderEvent::DataChunk(
177 process_chunk(chunk, rate_limit_rps, &limiter).await,
178 )
179 }
180 SourceReaderEvent::SplitProgress(progress) => {
181 yield SourceReaderEvent::SplitProgress(progress)
182 }
183 }
184 }
185}
186
187#[try_stream(ok = StreamChunk, error = ConnectorError)]
188pub async fn source_reader_event_to_chunk_stream(stream: BoxSourceReaderEventStream) {
189 #[for_await]
190 for event in stream {
191 match event? {
192 SourceReaderEvent::DataChunk(chunk) => yield chunk,
193 SourceReaderEvent::SplitProgress(_) => {}
194 }
195 }
196}
197
198#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
199pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
200 stream: BoxStreamingFileSourceChunkStream,
201 rate_limit_rps: Option<u32>,
202) {
203 if rate_limit_rps == Some(0) {
204 let future = futures::future::pending::<()>();
206 future.await;
207 unreachable!();
208 }
209
210 let limiter = RateLimiter::new(
211 rate_limit_rps
212 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
213 .into(),
214 );
215
216 #[for_await]
217 for chunk in stream {
218 let chunk_option = chunk?;
219 match chunk_option {
220 Some(chunk) => {
221 let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
222 yield Some(processed_chunk);
223 }
224 None => yield None,
225 }
226 }
227}
228
229async fn process_chunk(
230 chunk: StreamChunk,
231 rate_limit_rps: Option<u32>,
232 limiter: &RateLimiter,
233) -> StreamChunk {
234 let chunk_size = chunk.capacity();
235
236 if rate_limit_rps.is_none() || chunk_size == 0 {
237 return chunk;
239 }
240
241 let limit = rate_limit_rps.unwrap() as u64;
242 let required_permits = chunk.rate_limit_permits();
243 if required_permits > limit {
244 tracing::error!(
246 chunk_size,
247 required_permits,
248 limit,
249 "unexpected large chunk size"
250 );
251 }
252
253 limiter.wait(required_permits).await;
254 chunk
255}
256
257pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
258 const BASE_DELAY: Duration = Duration::from_secs(1);
259 const BACKOFF_FACTOR: u64 = 2;
260 const MAX_DELAY: Duration = Duration::from_secs(10);
261 ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
262 .factor(BACKOFF_FACTOR)
263 .max_delay(MAX_DELAY)
264 .map(jitter)
265}