risingwave_connector/source/
common.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::HashMap;
15
16use futures::{Stream, StreamExt, TryStreamExt};
17use futures_async_stream::try_stream;
18use risingwave_common::array::StreamChunk;
19
20use crate::error::{ConnectorError, ConnectorResult};
21use crate::parser::ParserConfig;
22use crate::source::{SourceContextRef, SourceMessage};
23
24/// Utility function to convert [`SourceMessage`] stream (got from specific connector's [`SplitReader`](super::SplitReader))
25/// into [`StreamChunk`] stream (by invoking [`ByteStreamSourceParserImpl`](crate::parser::ByteStreamSourceParserImpl)).
26#[try_stream(boxed, ok = StreamChunk, error = ConnectorError)]
27pub(crate) async fn into_chunk_stream(
28    data_stream: impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> + Send + 'static,
29    parser_config: ParserConfig,
30    source_ctx: SourceContextRef,
31) {
32    let actor_id = source_ctx.actor_id.to_string();
33    let fragment_id = source_ctx.fragment_id.to_string();
34    let source_id = source_ctx.source_id.to_string();
35    let source_name = source_ctx.source_name.clone();
36    let metrics = source_ctx.metrics.clone();
37    let mut partition_input_count = HashMap::new();
38    let mut partition_bytes_count = HashMap::new();
39
40    // add metrics to the data stream
41    let data_stream = data_stream
42        .inspect_ok(move |data_batch| {
43            let mut by_split_id = std::collections::HashMap::new();
44
45            for msg in data_batch {
46                let split_id: String = msg.split_id.as_ref().to_owned();
47                by_split_id
48                    .entry(split_id.clone())
49                    .or_insert_with(Vec::new)
50                    .push(msg);
51                partition_input_count
52                    .entry(split_id.clone())
53                    .or_insert_with(|| {
54                        metrics.partition_input_count.with_guarded_label_values(&[
55                            &actor_id,
56                            &source_id,
57                            &split_id.clone(),
58                            &source_name,
59                            &fragment_id,
60                        ])
61                    });
62                partition_bytes_count
63                    .entry(split_id.clone())
64                    .or_insert_with(|| {
65                        metrics.partition_input_bytes.with_guarded_label_values(&[
66                            &actor_id,
67                            &source_id,
68                            &split_id,
69                            &source_name,
70                            &fragment_id,
71                        ])
72                    });
73            }
74            for (split_id, msgs) in by_split_id {
75                partition_input_count
76                    .get_mut(&split_id)
77                    .unwrap()
78                    .inc_by(msgs.len() as u64);
79
80                let sum_bytes = msgs
81                    .iter()
82                    .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64))
83                    .sum();
84
85                partition_bytes_count
86                    .get_mut(&split_id)
87                    .unwrap()
88                    .inc_by(sum_bytes);
89            }
90        })
91        .boxed();
92
93    let parser =
94        crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
95    #[for_await]
96    for chunk in parser.parse_stream(data_stream) {
97        yield chunk?;
98    }
99}