risingwave_stream/executor/source/
mod.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26 BoxSourceChunkStream, BoxStreamingFileSourceChunkStream, SourceColumnDesc, SplitId,
27};
28use risingwave_pb::plan_common::AdditionalColumn;
29use risingwave_pb::plan_common::additional_column::ColumnType;
30pub use state_table_handler::*;
31
32mod executor_core;
33pub use executor_core::StreamSourceCore;
34
35mod reader_stream;
36
37mod source_executor;
38pub use source_executor::*;
39mod source_backfill_executor;
40pub use source_backfill_executor::*;
41mod fs_list_executor;
42pub use fs_list_executor::*;
43mod fs_fetch_executor;
44pub use fs_fetch_executor::*;
45mod iceberg_list_executor;
46pub use iceberg_list_executor::*;
47mod iceberg_fetch_executor;
48pub use iceberg_fetch_executor::*;
49
50mod source_backfill_state_table;
51pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
52
53pub mod state_table_handler;
54use futures_async_stream::try_stream;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio_retry::strategy::{ExponentialBackoff, jitter};
57
58use crate::executor::error::StreamExecutorError;
59use crate::executor::{Barrier, Message};
60
61#[try_stream(ok = Message, error = StreamExecutorError)]
63pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
64 while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
65 yield Message::Barrier(barrier);
66 }
67 bail!("barrier reader closed unexpectedly");
68}
69
70pub fn get_split_offset_mapping_from_chunk(
71 chunk: &StreamChunk,
72 split_idx: usize,
73 offset_idx: usize,
74) -> Option<HashMap<SplitId, String>> {
75 let mut split_offset_mapping = HashMap::new();
76 for i in 0..chunk.capacity() {
78 let (_, row, _) = chunk.row_at(i);
79 let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
80 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
81 split_offset_mapping.insert(split_id, offset.to_owned());
82 }
83 Some(split_offset_mapping)
84}
85
86pub fn get_split_offset_col_idx(
87 column_descs: &[SourceColumnDesc],
88) -> (Option<usize>, Option<usize>) {
89 let mut split_idx = None;
90 let mut offset_idx = None;
91 for (idx, column) in column_descs.iter().enumerate() {
92 match column.additional_column {
93 AdditionalColumn {
94 column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
95 } => {
96 split_idx = Some(idx);
97 }
98 AdditionalColumn {
99 column_type: Some(ColumnType::Offset(_)),
100 } => {
101 offset_idx = Some(idx);
102 }
103 _ => (),
104 }
105 }
106 (split_idx, offset_idx)
107}
108
109pub fn prune_additional_cols(
110 chunk: &StreamChunk,
111 split_idx: usize,
112 offset_idx: usize,
113 column_descs: &[SourceColumnDesc],
114) -> StreamChunk {
115 chunk.project(
116 &(0..chunk.dimension())
117 .filter(|&idx| {
118 (idx != split_idx && idx != offset_idx) || column_descs[idx].is_visible()
119 })
120 .collect_vec(),
121 )
122}
123
124#[try_stream(ok = StreamChunk, error = ConnectorError)]
125pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
126 if rate_limit_rps == Some(0) {
127 let future = futures::future::pending::<()>();
129 future.await;
130 unreachable!();
131 }
132
133 let limiter = RateLimiter::new(
134 rate_limit_rps
135 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
136 .into(),
137 );
138
139 #[for_await]
140 for chunk in stream {
141 let chunk = chunk?;
142 yield process_chunk(chunk, rate_limit_rps, &limiter).await;
143 }
144}
145
146#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
147pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
148 stream: BoxStreamingFileSourceChunkStream,
149 rate_limit_rps: Option<u32>,
150) {
151 if rate_limit_rps == Some(0) {
152 let future = futures::future::pending::<()>();
154 future.await;
155 unreachable!();
156 }
157
158 let limiter = RateLimiter::new(
159 rate_limit_rps
160 .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
161 .into(),
162 );
163
164 #[for_await]
165 for chunk in stream {
166 let chunk_option = chunk?;
167 match chunk_option {
168 Some(chunk) => {
169 let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
170 yield Some(processed_chunk);
171 }
172 None => yield None,
173 }
174 }
175}
176
177async fn process_chunk(
178 chunk: StreamChunk,
179 rate_limit_rps: Option<u32>,
180 limiter: &RateLimiter,
181) -> StreamChunk {
182 let chunk_size = chunk.capacity();
183
184 if rate_limit_rps.is_none() || chunk_size == 0 {
185 return chunk;
187 }
188
189 let limit = rate_limit_rps.unwrap() as u64;
190 let required_permits = chunk.compute_rate_limit_chunk_permits();
191 if required_permits > limit {
192 tracing::error!(
194 chunk_size,
195 required_permits,
196 limit,
197 "unexpected large chunk size"
198 );
199 }
200
201 limiter.wait(required_permits).await;
202 chunk
203}
204
205pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
206 const BASE_DELAY: Duration = Duration::from_secs(1);
207 const BACKOFF_FACTOR: u64 = 2;
208 const MAX_DELAY: Duration = Duration::from_secs(10);
209 ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
210 .factor(BACKOFF_FACTOR)
211 .max_delay(MAX_DELAY)
212 .map(jitter)
213}