risingwave_connector/source/
util.rs1use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::google_pubsub::PubsubSplit;
21use crate::source::nats::split::NatsSplit;
22
23pub fn fill_adaptive_split(
24 split_template: &SplitImpl,
25 actor_count: usize,
26) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
27 match split_template {
28 SplitImpl::Nats(split) => {
29 let mut new_splits = BTreeMap::new();
30 for idx in 0..actor_count {
31 let split_id: Arc<str> = idx.to_string().into();
32 new_splits.insert(
33 split_id.clone(),
34 SplitImpl::Nats(NatsSplit::new(
35 split.subject.clone(),
36 split_id,
37 split.start_sequence.clone(),
38 )),
39 );
40 }
41 tracing::debug!(
42 "Filled adaptive splits for Nats source, {} splits in total",
43 new_splits.len()
44 );
45 Ok(new_splits)
46 }
47 SplitImpl::GooglePubsub(split) => {
48 let mut new_splits = BTreeMap::new();
49 for idx in 0..actor_count {
50 let split_id: Arc<str> = format!("{}-{}", split.subscription, idx).into();
51 new_splits.insert(
52 split_id,
53 SplitImpl::GooglePubsub(PubsubSplit {
54 index: idx as u32,
55 subscription: split.subscription.clone(),
56 __deprecated_start_offset: None,
57 __deprecated_stop_offset: None,
58 }),
59 );
60 }
61 tracing::debug!(
62 "Filled adaptive splits for GooglePubsub source, {} splits in total",
63 new_splits.len()
64 );
65 Ok(new_splits)
66 }
67 _ => Err(ConnectorError::from(anyhow::anyhow!(
68 "Unsupported split type for adaptive splits: {:?}",
69 split_template
70 ))),
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use crate::source::SplitMetaData;
78 use crate::source::nats::split::NatsOffset;
79
80 #[test]
81 fn test_fill_adaptive_split_pubsub() {
82 let template = SplitImpl::GooglePubsub(PubsubSplit {
83 index: 0,
84 subscription: "projects/p/subscriptions/s".to_owned(),
85 __deprecated_start_offset: None,
86 __deprecated_stop_offset: None,
87 });
88
89 let splits = fill_adaptive_split(&template, 3).unwrap();
90 assert_eq!(splits.len(), 3);
91
92 for (idx, (split_id, split)) in splits.iter().enumerate() {
93 let expected_id = format!("projects/p/subscriptions/s-{}", idx);
94 assert_eq!(split_id.as_ref(), expected_id.as_str());
95 if let SplitImpl::GooglePubsub(ps) = split {
96 assert_eq!(ps.index, idx as u32);
97 assert_eq!(ps.subscription, "projects/p/subscriptions/s");
98 let expected_split_id: Arc<str> = expected_id.as_str().into();
99 assert_eq!(ps.id(), expected_split_id);
100 } else {
101 panic!("expected GooglePubsub split");
102 }
103 }
104 }
105
106 #[test]
107 fn test_fill_adaptive_split_pubsub_single_actor() {
108 let template = SplitImpl::GooglePubsub(PubsubSplit {
109 index: 0,
110 subscription: "sub".to_owned(),
111 __deprecated_start_offset: None,
112 __deprecated_stop_offset: None,
113 });
114
115 let splits = fill_adaptive_split(&template, 1).unwrap();
116 assert_eq!(splits.len(), 1);
117 assert!(splits.contains_key("sub-0"));
118 }
119
120 #[test]
121 fn test_fill_adaptive_split_nats() {
122 let template = SplitImpl::Nats(NatsSplit::new(
123 "test-subject".to_owned(),
124 "0".into(),
125 NatsOffset::None,
126 ));
127
128 let splits = fill_adaptive_split(&template, 4).unwrap();
129 assert_eq!(splits.len(), 4);
130
131 for idx in 0..4 {
132 assert!(splits.contains_key(idx.to_string().as_str()));
133 }
134 }
135
136 #[test]
137 fn test_fill_adaptive_split_unsupported() {
138 let template = SplitImpl::Kafka(crate::source::kafka::split::KafkaSplit::new(
140 0,
141 None,
142 None,
143 "topic".into(),
144 ));
145 let result = fill_adaptive_split(&template, 2);
146 assert!(result.is_err());
147 }
148}
149
150pub mod dummy {
155 use std::collections::HashMap;
156 use std::marker::PhantomData;
157
158 use risingwave_common::types::JsonbVal;
159 use serde::Deserialize;
160 use with_options::WithOptions;
161
162 use crate::enforce_secret::EnforceSecret;
163 use crate::error::ConnectorResult;
164 use crate::parser::ParserConfig;
165 use crate::source::{
166 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
167 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
168 };
169
170 #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
172 pub struct DummyProperties<T> {
173 _marker: PhantomData<T>,
174 }
175
176 impl<T> EnforceSecret for DummyProperties<T> {}
177
178 pub struct DummySplitEnumerator<T> {
180 _marker: PhantomData<T>,
181 }
182 #[derive(Deserialize, Debug, PartialEq, Clone)]
184 pub struct DummySplit<T> {
185 _marker: PhantomData<T>,
186 }
187
188 impl<T> UnknownFields for DummyProperties<T> {
189 fn unknown_fields(&self) -> HashMap<String, String> {
190 HashMap::new()
191 }
192 }
193
194 impl<T> SplitMetaData for DummySplit<T> {
195 fn id(&self) -> SplitId {
196 unreachable!()
197 }
198
199 fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
200 unreachable!()
201 }
202
203 fn encode_to_json(&self) -> JsonbVal {
204 unreachable!()
205 }
206
207 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
208 unreachable!()
209 }
210 }
211
212 #[async_trait::async_trait]
213 impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
214 type Properties = DummyProperties<T>;
215 type Split = DummySplit<T>;
216
217 async fn new(
218 _properties: Self::Properties,
219 _context: SourceEnumeratorContextRef,
220 ) -> crate::error::ConnectorResult<Self> {
221 Ok(Self {
222 _marker: PhantomData,
223 })
224 }
225
226 async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
227 Ok(vec![])
229 }
230 }
231
232 pub struct DummySourceReader<T> {
233 _marker: PhantomData<T>,
234 }
235
236 #[async_trait::async_trait]
237 impl<T: Send> SplitReader for DummySourceReader<T> {
238 type Properties = DummyProperties<T>;
239 type Split = DummySplit<T>;
240
241 async fn new(
242 _props: Self::Properties,
243 _splits: Vec<Self::Split>,
244 _parser_config: ParserConfig,
245 _source_ctx: SourceContextRef,
246 _columns: Option<Vec<Column>>,
247 ) -> crate::error::ConnectorResult<Self> {
248 Ok(Self {
249 _marker: PhantomData,
250 })
251 }
252
253 fn into_stream(self) -> BoxSourceChunkStream {
254 unreachable!()
255 }
256 }
257}