risingwave_stream/executor/source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26    BoxSourceChunkStream, BoxStreamingFileSourceChunkStream, SourceColumnDesc, SplitId,
27};
28use risingwave_pb::plan_common::AdditionalColumn;
29use risingwave_pb::plan_common::additional_column::ColumnType;
30pub use state_table_handler::*;
31
32mod executor_core;
33pub use executor_core::StreamSourceCore;
34
35mod reader_stream;
36
37mod source_executor;
38pub use source_executor::*;
39mod dummy_source_executor;
40pub use dummy_source_executor::*;
41mod source_backfill_executor;
42pub use source_backfill_executor::*;
43mod fs_list_executor;
44pub use fs_list_executor::*;
45mod fs_fetch_executor;
46pub use fs_fetch_executor::*;
47mod iceberg_list_executor;
48pub use iceberg_list_executor::*;
49mod iceberg_fetch_executor;
50pub use iceberg_fetch_executor::*;
51
52mod source_backfill_state_table;
53pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
54
55pub mod state_table_handler;
56use futures_async_stream::try_stream;
57use tokio::sync::mpsc::UnboundedReceiver;
58use tokio_retry::strategy::{ExponentialBackoff, jitter};
59
60use crate::executor::error::StreamExecutorError;
61use crate::executor::{Barrier, Message};
62
63/// Receive barriers from barrier manager with the channel, error on channel close.
64#[try_stream(ok = Message, error = StreamExecutorError)]
65pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
66    while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
67        yield Message::Barrier(barrier);
68    }
69    bail!("barrier reader closed unexpectedly");
70}
71
72pub fn get_split_offset_mapping_from_chunk(
73    chunk: &StreamChunk,
74    split_idx: usize,
75    offset_idx: usize,
76) -> Option<HashMap<SplitId, String>> {
77    let mut split_offset_mapping = HashMap::new();
78    // All rows (including those visible or invisible) will be used to update the source offset.
79    for i in 0..chunk.capacity() {
80        let (_, row, _) = chunk.row_at(i);
81        let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
82        let offset = row.datum_at(offset_idx).unwrap().into_utf8();
83        split_offset_mapping.insert(split_id, offset.to_owned());
84    }
85    Some(split_offset_mapping)
86}
87
88pub fn get_split_offset_col_idx(
89    column_descs: &[SourceColumnDesc],
90) -> (Option<usize>, Option<usize>) {
91    let mut split_idx = None;
92    let mut offset_idx = None;
93    for (idx, column) in column_descs.iter().enumerate() {
94        match column.additional_column {
95            AdditionalColumn {
96                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
97            } => {
98                split_idx = Some(idx);
99            }
100            AdditionalColumn {
101                column_type: Some(ColumnType::Offset(_)),
102            } => {
103                offset_idx = Some(idx);
104            }
105            _ => (),
106        }
107    }
108    (split_idx, offset_idx)
109}
110
111pub fn prune_additional_cols(
112    chunk: &StreamChunk,
113    split_idx: usize,
114    offset_idx: usize,
115    column_descs: &[SourceColumnDesc],
116) -> StreamChunk {
117    chunk.project(
118        &(0..chunk.dimension())
119            .filter(|&idx| {
120                (idx != split_idx && idx != offset_idx) || column_descs[idx].is_visible()
121            })
122            .collect_vec(),
123    )
124}
125
126#[try_stream(ok = StreamChunk, error = ConnectorError)]
127pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
128    if rate_limit_rps == Some(0) {
129        // block the stream until the rate limit is reset
130        let future = futures::future::pending::<()>();
131        future.await;
132        unreachable!();
133    }
134
135    let limiter = RateLimiter::new(
136        rate_limit_rps
137            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
138            .into(),
139    );
140
141    #[for_await]
142    for chunk in stream {
143        let chunk = chunk?;
144        yield process_chunk(chunk, rate_limit_rps, &limiter).await;
145    }
146}
147
148#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
149pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
150    stream: BoxStreamingFileSourceChunkStream,
151    rate_limit_rps: Option<u32>,
152) {
153    if rate_limit_rps == Some(0) {
154        // block the stream until the rate limit is reset
155        let future = futures::future::pending::<()>();
156        future.await;
157        unreachable!();
158    }
159
160    let limiter = RateLimiter::new(
161        rate_limit_rps
162            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
163            .into(),
164    );
165
166    #[for_await]
167    for chunk in stream {
168        let chunk_option = chunk?;
169        match chunk_option {
170            Some(chunk) => {
171                let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
172                yield Some(processed_chunk);
173            }
174            None => yield None,
175        }
176    }
177}
178
179async fn process_chunk(
180    chunk: StreamChunk,
181    rate_limit_rps: Option<u32>,
182    limiter: &RateLimiter,
183) -> StreamChunk {
184    let chunk_size = chunk.capacity();
185
186    if rate_limit_rps.is_none() || chunk_size == 0 {
187        // no limit, or empty chunk
188        return chunk;
189    }
190
191    let limit = rate_limit_rps.unwrap() as u64;
192    let required_permits = chunk.compute_rate_limit_chunk_permits();
193    if required_permits > limit {
194        // This should not happen after the mentioned PR.
195        tracing::error!(
196            chunk_size,
197            required_permits,
198            limit,
199            "unexpected large chunk size"
200        );
201    }
202
203    limiter.wait(required_permits).await;
204    chunk
205}
206
207pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
208    const BASE_DELAY: Duration = Duration::from_secs(1);
209    const BACKOFF_FACTOR: u64 = 2;
210    const MAX_DELAY: Duration = Duration::from_secs(10);
211    ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
212        .factor(BACKOFF_FACTOR)
213        .max_delay(MAX_DELAY)
214        .map(jitter)
215}