risingwave_stream/executor/source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26    BoxSourceChunkStream, BoxStreamingFileSourceChunkStream, SourceColumnDesc, SplitId,
27};
28use risingwave_pb::plan_common::AdditionalColumn;
29use risingwave_pb::plan_common::additional_column::ColumnType;
30pub use state_table_handler::*;
31
32mod executor_core;
33pub use executor_core::StreamSourceCore;
34
35mod reader_stream;
36
37mod source_executor;
38pub use source_executor::*;
39mod dummy_source_executor;
40pub use dummy_source_executor::*;
41mod source_backfill_executor;
42pub use source_backfill_executor::*;
43mod fs_list_executor;
44pub use fs_list_executor::*;
45mod fs_fetch_executor;
46pub use fs_fetch_executor::*;
47mod iceberg_list_executor;
48pub use iceberg_list_executor::*;
49mod iceberg_fetch_executor;
50pub use iceberg_fetch_executor::*;
51mod batch_source; // For refreshable batch source executors
52pub use batch_source::*;
53mod source_backfill_state_table;
54pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
55
56pub mod state_table_handler;
57use futures_async_stream::try_stream;
58use tokio::sync::mpsc::UnboundedReceiver;
59use tokio_retry::strategy::{ExponentialBackoff, jitter};
60
61use crate::executor::error::StreamExecutorError;
62use crate::executor::{Barrier, Message};
63
64/// Receive barriers from barrier manager with the channel, error on channel close.
65#[try_stream(ok = Message, error = StreamExecutorError)]
66pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
67    while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
68        yield Message::Barrier(barrier);
69    }
70    bail!("barrier reader closed unexpectedly");
71}
72
73pub fn get_split_offset_mapping_from_chunk(
74    chunk: &StreamChunk,
75    split_idx: usize,
76    offset_idx: usize,
77) -> Option<HashMap<SplitId, String>> {
78    let mut split_offset_mapping = HashMap::new();
79    // All rows (including those visible or invisible) will be used to update the source offset.
80    for i in 0..chunk.capacity() {
81        let (_, row, _) = chunk.row_at(i);
82        let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
83        let offset = row.datum_at(offset_idx).unwrap().into_utf8();
84        split_offset_mapping.insert(split_id, offset.to_owned());
85    }
86    Some(split_offset_mapping)
87}
88
89/// Get the indices of the split, offset, and pulsar message id columns.
90pub fn get_split_offset_col_idx(
91    column_descs: &[SourceColumnDesc],
92) -> (Option<usize>, Option<usize>, Option<usize>) {
93    let mut split_idx = None;
94    let mut offset_idx = None;
95    let mut pulsar_message_id_idx = None;
96    for (idx, column) in column_descs.iter().enumerate() {
97        match column.additional_column {
98            AdditionalColumn {
99                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
100            } => {
101                split_idx = Some(idx);
102            }
103            AdditionalColumn {
104                column_type: Some(ColumnType::Offset(_)),
105            } => {
106                offset_idx = Some(idx);
107            }
108            AdditionalColumn {
109                column_type: Some(ColumnType::PulsarMessageIdData(_)),
110            } => {
111                pulsar_message_id_idx = Some(idx);
112            }
113            _ => (),
114        }
115    }
116    (split_idx, offset_idx, pulsar_message_id_idx)
117}
118
119pub fn prune_additional_cols(
120    chunk: &StreamChunk,
121    to_prune_indices: &[usize],
122    column_descs: &[SourceColumnDesc],
123) -> StreamChunk {
124    chunk.project(
125        &(0..chunk.dimension())
126            .filter(|&idx| !to_prune_indices.contains(&idx) || column_descs[idx].is_visible())
127            .collect_vec(),
128    )
129}
130
131#[try_stream(ok = StreamChunk, error = ConnectorError)]
132pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
133    if rate_limit_rps == Some(0) {
134        // block the stream until the rate limit is reset
135        let future = futures::future::pending::<()>();
136        future.await;
137        unreachable!();
138    }
139
140    let limiter = RateLimiter::new(
141        rate_limit_rps
142            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
143            .into(),
144    );
145
146    #[for_await]
147    for chunk in stream {
148        let chunk = chunk?;
149        yield process_chunk(chunk, rate_limit_rps, &limiter).await;
150    }
151}
152
153#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
154pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
155    stream: BoxStreamingFileSourceChunkStream,
156    rate_limit_rps: Option<u32>,
157) {
158    if rate_limit_rps == Some(0) {
159        // block the stream until the rate limit is reset
160        let future = futures::future::pending::<()>();
161        future.await;
162        unreachable!();
163    }
164
165    let limiter = RateLimiter::new(
166        rate_limit_rps
167            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
168            .into(),
169    );
170
171    #[for_await]
172    for chunk in stream {
173        let chunk_option = chunk?;
174        match chunk_option {
175            Some(chunk) => {
176                let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
177                yield Some(processed_chunk);
178            }
179            None => yield None,
180        }
181    }
182}
183
184async fn process_chunk(
185    chunk: StreamChunk,
186    rate_limit_rps: Option<u32>,
187    limiter: &RateLimiter,
188) -> StreamChunk {
189    let chunk_size = chunk.capacity();
190
191    if rate_limit_rps.is_none() || chunk_size == 0 {
192        // no limit, or empty chunk
193        return chunk;
194    }
195
196    let limit = rate_limit_rps.unwrap() as u64;
197    let required_permits = chunk.rate_limit_permits();
198    if required_permits > limit {
199        // This should not happen after the mentioned PR.
200        tracing::error!(
201            chunk_size,
202            required_permits,
203            limit,
204            "unexpected large chunk size"
205        );
206    }
207
208    limiter.wait(required_permits).await;
209    chunk
210}
211
212pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
213    const BASE_DELAY: Duration = Duration::from_secs(1);
214    const BACKOFF_FACTOR: u64 = 2;
215    const MAX_DELAY: Duration = Duration::from_secs(10);
216    ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
217        .factor(BACKOFF_FACTOR)
218        .max_delay(MAX_DELAY)
219        .map(jitter)
220}