risingwave_stream/executor/source/
mod.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26 BoxSourceChunkStream, BoxStreamingFileSourceChunkStream, SourceColumnDesc, SplitId,
27};
28use risingwave_pb::plan_common::AdditionalColumn;
29use risingwave_pb::plan_common::additional_column::ColumnType;
30pub use state_table_handler::*;
31
32mod executor_core;
33pub use executor_core::StreamSourceCore;
34
35mod reader_stream;
36
37mod source_executor;
38pub use source_executor::*;
39mod dummy_source_executor;
40pub use dummy_source_executor::*;
41mod source_backfill_executor;
42pub use source_backfill_executor::*;
43mod fs_list_executor;
44pub use fs_list_executor::*;
45mod fs_fetch_executor;
46pub use fs_fetch_executor::*;
47mod iceberg_list_executor;
48pub use iceberg_list_executor::*;
49mod iceberg_fetch_executor;
50pub use iceberg_fetch_executor::*;
51mod batch_source; pub use batch_source::*;
53mod source_backfill_state_table;
54pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
55
56pub mod state_table_handler;
57use futures_async_stream::try_stream;
58use tokio::sync::mpsc::UnboundedReceiver;
59use tokio_retry::strategy::{ExponentialBackoff, jitter};
60
61use crate::executor::error::StreamExecutorError;
62use crate::executor::{Barrier, Message};
63
64#[try_stream(ok = Message, error = StreamExecutorError)]
66pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
67 while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
68 yield Message::Barrier(barrier);
69 }
70 bail!("barrier reader closed unexpectedly");
71}
72
73pub fn get_split_offset_mapping_from_chunk(
74 chunk: &StreamChunk,
75 split_idx: usize,
76 offset_idx: usize,
77) -> Option<HashMap<SplitId, String>> {
78 let mut split_offset_mapping = HashMap::new();
79 for i in 0..chunk.capacity() {
81 let (_, row, _) = chunk.row_at(i);
82 let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
83 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
84 split_offset_mapping.insert(split_id, offset.to_owned());
85 }
86 Some(split_offset_mapping)
87}
88
89pub fn get_split_offset_col_idx(
91 column_descs: &[SourceColumnDesc],
92) -> (Option<usize>, Option<usize>, Option<usize>) {
93 let mut split_idx = None;
94 let mut offset_idx = None;
95 let mut pulsar_message_id_idx = None;
96 for (idx, column) in column_descs.iter().enumerate() {
97 match column.additional_column {
98 AdditionalColumn {
99 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
100 } => {
101 split_idx = Some(idx);
102 }
103 AdditionalColumn {
104 column_type: Some(ColumnType::Offset(_)),
105 } => {
106 offset_idx = Some(idx);
107 }
108 AdditionalColumn {
109 column_type: Some(ColumnType::PulsarMessageIdData(_)),
110 } => {
111 pulsar_message_id_idx = Some(idx);
112 }
113 _ => (),
114 }
115 }
116 (split_idx, offset_idx, pulsar_message_id_idx)
117}
118
119pub fn prune_additional_cols(
120 chunk: &StreamChunk,
121 to_prune_indices: &[usize],
122 column_descs: &[SourceColumnDesc],
123) -> StreamChunk {
124 chunk.project(
125 &(0..chunk.dimension())
126 .filter(|&idx| !to_prune_indices.contains(&idx) || column_descs[idx].is_visible())
127 .collect_vec(),
128 )
129}
130
131#[try_stream(ok = StreamChunk, error = ConnectorError)]
132pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
133 if rate_limit_rps == Some(0) {
134 let future = futures::future::pending::<()>();
136 future.await;
137 unreachable!();
138 }
139
140 let limiter = RateLimiter::new(
141 rate_limit_rps
142 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
143 .into(),
144 );
145
146 #[for_await]
147 for chunk in stream {
148 let chunk = chunk?;
149 yield process_chunk(chunk, rate_limit_rps, &limiter).await;
150 }
151}
152
153#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
154pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
155 stream: BoxStreamingFileSourceChunkStream,
156 rate_limit_rps: Option<u32>,
157) {
158 if rate_limit_rps == Some(0) {
159 let future = futures::future::pending::<()>();
161 future.await;
162 unreachable!();
163 }
164
165 let limiter = RateLimiter::new(
166 rate_limit_rps
167 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
168 .into(),
169 );
170
171 #[for_await]
172 for chunk in stream {
173 let chunk_option = chunk?;
174 match chunk_option {
175 Some(chunk) => {
176 let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
177 yield Some(processed_chunk);
178 }
179 None => yield None,
180 }
181 }
182}
183
184async fn process_chunk(
185 chunk: StreamChunk,
186 rate_limit_rps: Option<u32>,
187 limiter: &RateLimiter,
188) -> StreamChunk {
189 let chunk_size = chunk.capacity();
190
191 if rate_limit_rps.is_none() || chunk_size == 0 {
192 return chunk;
194 }
195
196 let limit = rate_limit_rps.unwrap() as u64;
197 let required_permits = chunk.rate_limit_permits();
198 if required_permits > limit {
199 tracing::error!(
201 chunk_size,
202 required_permits,
203 limit,
204 "unexpected large chunk size"
205 );
206 }
207
208 limiter.wait(required_permits).await;
209 chunk
210}
211
212pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
213 const BASE_DELAY: Duration = Duration::from_secs(1);
214 const BACKOFF_FACTOR: u64 = 2;
215 const MAX_DELAY: Duration = Duration::from_secs(10);
216 ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
217 .factor(BACKOFF_FACTOR)
218 .max_delay(MAX_DELAY)
219 .map(jitter)
220}