risingwave_stream/executor/source/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::Duration;
17
18use await_tree::InstrumentAwait;
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::bail;
22use risingwave_common::row::Row;
23use risingwave_common_rate_limit::RateLimiter;
24use risingwave_connector::error::ConnectorError;
25use risingwave_connector::source::{
26    BoxSourceChunkStream, BoxSourceReaderEventStream, BoxStreamingFileSourceChunkStream,
27    SourceColumnDesc, SourceReaderEvent, SplitId,
28};
29use risingwave_pb::plan_common::AdditionalColumn;
30use risingwave_pb::plan_common::additional_column::ColumnType;
31pub use state_table_handler::*;
32
33mod executor_core;
34pub use executor_core::StreamSourceCore;
35
36mod reader_stream;
37
38mod source_executor;
39pub use source_executor::*;
40mod dummy_source_executor;
41pub use dummy_source_executor::*;
42mod source_backfill_executor;
43pub use source_backfill_executor::*;
44mod fs_list_executor;
45pub use fs_list_executor::*;
46mod fs_fetch_executor;
47pub use fs_fetch_executor::*;
48mod iceberg_list_executor;
49pub use iceberg_list_executor::*;
50mod iceberg_fetch_executor;
51pub use iceberg_fetch_executor::*;
52mod batch_source; // For refreshable batch source executors
53pub use batch_source::*;
54mod source_backfill_state_table;
55pub(crate) use source_backfill_state_table::BackfillStateTableHandler;
56
57pub mod state_table_handler;
58use futures_async_stream::try_stream;
59use tokio::sync::mpsc::UnboundedReceiver;
60use tokio_retry::strategy::{ExponentialBackoff, jitter};
61
62use crate::executor::error::StreamExecutorError;
63use crate::executor::{Barrier, Message};
64
65/// Receive barriers from barrier manager with the channel, error on channel close.
66#[try_stream(ok = Message, error = StreamExecutorError)]
67pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
68    while let Some(barrier) = rx.recv().instrument_await("receive_barrier").await {
69        yield Message::Barrier(barrier);
70    }
71    bail!("barrier reader closed unexpectedly");
72}
73
74pub fn get_split_offset_mapping_from_chunk(
75    chunk: &StreamChunk,
76    split_idx: usize,
77    offset_idx: usize,
78) -> Option<HashMap<SplitId, String>> {
79    let mut split_offset_mapping = HashMap::new();
80    // All rows (including those visible or invisible) will be used to update the source offset.
81    for i in 0..chunk.capacity() {
82        let (_, row, _) = chunk.row_at(i);
83        let split_id = row.datum_at(split_idx).unwrap().into_utf8().into();
84        let offset = row.datum_at(offset_idx).unwrap().into_utf8();
85        split_offset_mapping.insert(split_id, offset.to_owned());
86    }
87    Some(split_offset_mapping)
88}
89
90/// Get the indices of the split, offset, and pulsar message id columns.
91pub fn get_split_offset_col_idx(
92    column_descs: &[SourceColumnDesc],
93) -> (Option<usize>, Option<usize>, Option<usize>) {
94    let mut split_idx = None;
95    let mut offset_idx = None;
96    let mut pulsar_message_id_idx = None;
97    for (idx, column) in column_descs.iter().enumerate() {
98        match column.additional_column {
99            AdditionalColumn {
100                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
101            } => {
102                split_idx = Some(idx);
103            }
104            AdditionalColumn {
105                column_type: Some(ColumnType::Offset(_)),
106            } => {
107                offset_idx = Some(idx);
108            }
109            AdditionalColumn {
110                column_type: Some(ColumnType::PulsarMessageIdData(_)),
111            } => {
112                pulsar_message_id_idx = Some(idx);
113            }
114            _ => (),
115        }
116    }
117    (split_idx, offset_idx, pulsar_message_id_idx)
118}
119
120pub fn prune_additional_cols(
121    chunk: &StreamChunk,
122    to_prune_indices: &[usize],
123    column_descs: &[SourceColumnDesc],
124) -> StreamChunk {
125    chunk.project(
126        &(0..chunk.dimension())
127            .filter(|&idx| !to_prune_indices.contains(&idx) || column_descs[idx].is_visible())
128            .collect_vec(),
129    )
130}
131
132#[try_stream(ok = StreamChunk, error = ConnectorError)]
133pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Option<u32>) {
134    if rate_limit_rps == Some(0) {
135        // block the stream until the rate limit is reset
136        let future = futures::future::pending::<()>();
137        future.await;
138        unreachable!();
139    }
140
141    let limiter = RateLimiter::new(
142        rate_limit_rps
143            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
144            .into(),
145    );
146
147    #[for_await]
148    for chunk in stream {
149        let chunk = chunk?;
150        yield process_chunk(chunk, rate_limit_rps, &limiter).await;
151    }
152}
153
154#[try_stream(ok = SourceReaderEvent, error = ConnectorError)]
155pub async fn apply_rate_limit_to_source_reader_event(
156    stream: BoxSourceReaderEventStream,
157    rate_limit_rps: Option<u32>,
158) {
159    if rate_limit_rps == Some(0) {
160        // block the stream until the rate limit is reset
161        let future = futures::future::pending::<()>();
162        future.await;
163        unreachable!();
164    }
165
166    let limiter = RateLimiter::new(
167        rate_limit_rps
168            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
169            .into(),
170    );
171
172    #[for_await]
173    for event in stream {
174        match event? {
175            SourceReaderEvent::DataChunk(chunk) => {
176                yield SourceReaderEvent::DataChunk(
177                    process_chunk(chunk, rate_limit_rps, &limiter).await,
178                )
179            }
180            SourceReaderEvent::SplitProgress(progress) => {
181                yield SourceReaderEvent::SplitProgress(progress)
182            }
183        }
184    }
185}
186
187#[try_stream(ok = StreamChunk, error = ConnectorError)]
188pub async fn source_reader_event_to_chunk_stream(stream: BoxSourceReaderEventStream) {
189    #[for_await]
190    for event in stream {
191        match event? {
192            SourceReaderEvent::DataChunk(chunk) => yield chunk,
193            SourceReaderEvent::SplitProgress(_) => {}
194        }
195    }
196}
197
198#[try_stream(ok = Option<StreamChunk>, error = ConnectorError)]
199pub async fn apply_rate_limit_with_for_streaming_file_source_reader(
200    stream: BoxStreamingFileSourceChunkStream,
201    rate_limit_rps: Option<u32>,
202) {
203    if rate_limit_rps == Some(0) {
204        // block the stream until the rate limit is reset
205        let future = futures::future::pending::<()>();
206        future.await;
207        unreachable!();
208    }
209
210    let limiter = RateLimiter::new(
211        rate_limit_rps
212            .inspect(|limit| tracing::info!(rate_limit = limit, "rate limit applied"))
213            .into(),
214    );
215
216    #[for_await]
217    for chunk in stream {
218        let chunk_option = chunk?;
219        match chunk_option {
220            Some(chunk) => {
221                let processed_chunk = process_chunk(chunk, rate_limit_rps, &limiter).await;
222                yield Some(processed_chunk);
223            }
224            None => yield None,
225        }
226    }
227}
228
229async fn process_chunk(
230    chunk: StreamChunk,
231    rate_limit_rps: Option<u32>,
232    limiter: &RateLimiter,
233) -> StreamChunk {
234    let chunk_size = chunk.capacity();
235
236    if rate_limit_rps.is_none() || chunk_size == 0 {
237        // no limit, or empty chunk
238        return chunk;
239    }
240
241    let limit = rate_limit_rps.unwrap() as u64;
242    let required_permits = chunk.rate_limit_permits();
243    if required_permits > limit {
244        // This should not happen after the mentioned PR.
245        tracing::error!(
246            chunk_size,
247            required_permits,
248            limit,
249            "unexpected large chunk size"
250        );
251    }
252
253    limiter.wait(required_permits).await;
254    chunk
255}
256
257pub fn get_infinite_backoff_strategy() -> impl Iterator<Item = Duration> {
258    const BASE_DELAY: Duration = Duration::from_secs(1);
259    const BACKOFF_FACTOR: u64 = 2;
260    const MAX_DELAY: Duration = Duration::from_secs(10);
261    ExponentialBackoff::from_millis(BASE_DELAY.as_millis() as u64)
262        .factor(BACKOFF_FACTOR)
263        .max_delay(MAX_DELAY)
264        .map(jitter)
265}