risingwave_connector/source/
common.rs1use std::collections::HashMap;
15
16use futures::{Stream, StreamExt, TryStreamExt};
17use futures_async_stream::try_stream;
18use risingwave_common::array::StreamChunk;
19
20use crate::error::{ConnectorError, ConnectorResult};
21use crate::parser::ParserConfig;
22use crate::source::{SourceContextRef, SourceMessage};
23
24#[try_stream(boxed, ok = StreamChunk, error = ConnectorError)]
27pub(crate) async fn into_chunk_stream(
28 data_stream: impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> + Send + 'static,
29 parser_config: ParserConfig,
30 source_ctx: SourceContextRef,
31) {
32 let actor_id = source_ctx.actor_id.to_string();
33 let fragment_id = source_ctx.fragment_id.to_string();
34 let source_id = source_ctx.source_id.to_string();
35 let source_name = source_ctx.source_name.clone();
36 let metrics = source_ctx.metrics.clone();
37 let mut partition_input_count = HashMap::new();
38 let mut partition_bytes_count = HashMap::new();
39
40 let data_stream = data_stream
42 .inspect_ok(move |data_batch| {
43 let mut by_split_id = std::collections::HashMap::new();
44
45 for msg in data_batch {
46 let split_id: String = msg.split_id.as_ref().to_owned();
47 by_split_id
48 .entry(split_id.clone())
49 .or_insert_with(Vec::new)
50 .push(msg);
51 partition_input_count
52 .entry(split_id.clone())
53 .or_insert_with(|| {
54 metrics.partition_input_count.with_guarded_label_values(&[
55 &actor_id,
56 &source_id,
57 &split_id.clone(),
58 &source_name,
59 &fragment_id,
60 ])
61 });
62 partition_bytes_count
63 .entry(split_id.clone())
64 .or_insert_with(|| {
65 metrics.partition_input_bytes.with_guarded_label_values(&[
66 &actor_id,
67 &source_id,
68 &split_id,
69 &source_name,
70 &fragment_id,
71 ])
72 });
73 }
74 for (split_id, msgs) in by_split_id {
75 partition_input_count
76 .get_mut(&split_id)
77 .unwrap()
78 .inc_by(msgs.len() as u64);
79
80 let sum_bytes = msgs
81 .iter()
82 .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64))
83 .sum();
84
85 partition_bytes_count
86 .get_mut(&split_id)
87 .unwrap()
88 .inc_by(sum_bytes);
89 }
90 })
91 .boxed();
92
93 let parser =
94 crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
95 #[for_await]
96 for chunk in parser.parse_stream(data_stream) {
97 yield chunk?;
98 }
99}