risingwave_stream/executor/source/
mod.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26 BoxSourceChunkStream, BoxStreamingFileSourceChunkStream, SourceColumnDesc, SplitId,
27};
28use risingwave_pb::plan_common::AdditionalColumn;
29use risingwave_pb::plan_common::additional_column::ColumnType;
30pub use state_table_handler::*;
31
32mod executor_core;
33pub use executor_core::StreamSourceCore;
34
35mod reader_stream;
36
37mod source_executor;
38pub use source_executor::*;
39mod dummy_source_executor;
40pub use dummy_source_executor::*;
41mod source_backfill_executor;
42pub use source_backfill_executor::*;
43mod fs_list_executor;
44pub use fs_list_executor::*;
45mod fs_fetch_executor;
46pub use fs_fetch_executor::*;
47mod iceberg_list_executor;
48pub use iceberg_list_executor::*;
49mod iceberg_fetch_executor;
50pub use iceberg_fetch_executor::*;
51
52mod source_backfill_state_table;
53pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
54
55pub mod state_table_handler;
56use futures_async_stream::try_stream;
57use tokio::sync::mpsc::UnboundedReceiver;
58use tokio_retry::strategy::{ExponentialBackoff, jitter};
59
60use crate::executor::error::StreamExecutorError;
61use crate::executor::{Barrier, Message};
62
63#[try_stream(ok = Message, error = StreamExecutorError)]
65pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
66 while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
67 yield Message::Barrier(barrier);
68 }
69 bail!("barrier reader closed unexpectedly");
70}
71
72pub fn get_split_offset_mapping_from_chunk(
73 chunk: &StreamChunk,
74 split_idx: usize,
75 offset_idx: usize,
76) -> Option<HashMap<SplitId, String>> {
77 let mut split_offset_mapping = HashMap::new();
78 for i in 0..chunk.capacity() {
80 let (_, row, _) = chunk.row_at(i);
81 let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
82 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
83 split_offset_mapping.insert(split_id, offset.to_owned());
84 }
85 Some(split_offset_mapping)
86}
87
88pub fn get_split_offset_col_idx(
89 column_descs: &[SourceColumnDesc],
90) -> (Option<usize>, Option<usize>) {
91 let mut split_idx = None;
92 let mut offset_idx = None;
93 for (idx, column) in column_descs.iter().enumerate() {
94 match column.additional_column {
95 AdditionalColumn {
96 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
97 } => {
98 split_idx = Some(idx);
99 }
100 AdditionalColumn {
101 column_type: Some(ColumnType::Offset(_)),
102 } => {
103 offset_idx = Some(idx);
104 }
105 _ => (),
106 }
107 }
108 (split_idx, offset_idx)
109}
110
111pub fn prune_additional_cols(
112 chunk: &StreamChunk,
113 split_idx: usize,
114 offset_idx: usize,
115 column_descs: &[SourceColumnDesc],
116) -> StreamChunk {
117 chunk.project(
118 &(0..chunk.dimension())
119 .filter(|&idx| {
120 (idx != split_idx && idx != offset_idx) || column_descs[idx].is_visible()
121 })
122 .collect_vec(),
123 )
124}
125
126#[try_stream(ok = StreamChunk, error = ConnectorError)]
127pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
128 if rate_limit_rps == Some(0) {
129 let future = futures::future::pending::<()>();
131 future.await;
132 unreachable!();
133 }
134
135 let limiter = RateLimiter::new(
136 rate_limit_rps
137 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
138 .into(),
139 );
140
141 #[for_await]
142 for chunk in stream {
143 let chunk = chunk?;
144 yield process_chunk(chunk, rate_limit_rps, &limiter).await;
145 }
146}
147
148#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
149pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
150 stream: BoxStreamingFileSourceChunkStream,
151 rate_limit_rps: Option<u32>,
152) {
153 if rate_limit_rps == Some(0) {
154 let future = futures::future::pending::<()>();
156 future.await;
157 unreachable!();
158 }
159
160 let limiter = RateLimiter::new(
161 rate_limit_rps
162 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
163 .into(),
164 );
165
166 #[for_await]
167 for chunk in stream {
168 let chunk_option = chunk?;
169 match chunk_option {
170 Some(chunk) => {
171 let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
172 yield Some(processed_chunk);
173 }
174 None => yield None,
175 }
176 }
177}
178
179async fn process_chunk(
180 chunk: StreamChunk,
181 rate_limit_rps: Option<u32>,
182 limiter: &RateLimiter,
183) -> StreamChunk {
184 let chunk_size = chunk.capacity();
185
186 if rate_limit_rps.is_none() || chunk_size == 0 {
187 return chunk;
189 }
190
191 let limit = rate_limit_rps.unwrap() as u64;
192 let required_permits = chunk.compute_rate_limit_chunk_permits();
193 if required_permits > limit {
194 tracing::error!(
196 chunk_size,
197 required_permits,
198 limit,
199 "unexpected large chunk size"
200 );
201 }
202
203 limiter.wait(required_permits).await;
204 chunk
205}
206
207pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
208 const BASE_DELAY: Duration = Duration::from_secs(1);
209 const BACKOFF_FACTOR: u64 = 2;
210 const MAX_DELAY: Duration = Duration::from_secs(10);
211 ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
212 .factor(BACKOFF_FACTOR)
213 .max_delay(MAX_DELAY)
214 .map(jitter)
215}