risingwave_connector/source/
util.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::google_pubsub::PubsubSplit;
21use crate::source::nats::split::NatsSplit;
22
23pub fn fill_adaptive_split(
24    split_template: &SplitImpl,
25    actor_count: usize,
26) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
27    match split_template {
28        SplitImpl::Nats(split) => {
29            let mut new_splits = BTreeMap::new();
30            for idx in 0..actor_count {
31                let split_id: Arc<str> = idx.to_string().into();
32                new_splits.insert(
33                    split_id.clone(),
34                    SplitImpl::Nats(NatsSplit::new(
35                        split.subject.clone(),
36                        split_id,
37                        split.start_sequence.clone(),
38                    )),
39                );
40            }
41            tracing::debug!(
42                "Filled adaptive splits for Nats source, {} splits in total",
43                new_splits.len()
44            );
45            Ok(new_splits)
46        }
47        SplitImpl::GooglePubsub(split) => {
48            let mut new_splits = BTreeMap::new();
49            for idx in 0..actor_count {
50                let split_id: Arc<str> = format!("{}-{}", split.subscription, idx).into();
51                new_splits.insert(
52                    split_id,
53                    SplitImpl::GooglePubsub(PubsubSplit {
54                        index: idx as u32,
55                        subscription: split.subscription.clone(),
56                        __deprecated_start_offset: None,
57                        __deprecated_stop_offset: None,
58                    }),
59                );
60            }
61            tracing::debug!(
62                "Filled adaptive splits for GooglePubsub source, {} splits in total",
63                new_splits.len()
64            );
65            Ok(new_splits)
66        }
67        _ => Err(ConnectorError::from(anyhow::anyhow!(
68            "Unsupported split type for adaptive splits: {:?}",
69            split_template
70        ))),
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use crate::source::SplitMetaData;
78    use crate::source::nats::split::NatsOffset;
79
80    #[test]
81    fn test_fill_adaptive_split_pubsub() {
82        let template = SplitImpl::GooglePubsub(PubsubSplit {
83            index: 0,
84            subscription: "projects/p/subscriptions/s".to_owned(),
85            __deprecated_start_offset: None,
86            __deprecated_stop_offset: None,
87        });
88
89        let splits = fill_adaptive_split(&template, 3).unwrap();
90        assert_eq!(splits.len(), 3);
91
92        for (idx, (split_id, split)) in splits.iter().enumerate() {
93            let expected_id = format!("projects/p/subscriptions/s-{}", idx);
94            assert_eq!(split_id.as_ref(), expected_id.as_str());
95            if let SplitImpl::GooglePubsub(ps) = split {
96                assert_eq!(ps.index, idx as u32);
97                assert_eq!(ps.subscription, "projects/p/subscriptions/s");
98                let expected_split_id: Arc<str> = expected_id.as_str().into();
99                assert_eq!(ps.id(), expected_split_id);
100            } else {
101                panic!("expected GooglePubsub split");
102            }
103        }
104    }
105
106    #[test]
107    fn test_fill_adaptive_split_pubsub_single_actor() {
108        let template = SplitImpl::GooglePubsub(PubsubSplit {
109            index: 0,
110            subscription: "sub".to_owned(),
111            __deprecated_start_offset: None,
112            __deprecated_stop_offset: None,
113        });
114
115        let splits = fill_adaptive_split(&template, 1).unwrap();
116        assert_eq!(splits.len(), 1);
117        assert!(splits.contains_key("sub-0"));
118    }
119
120    #[test]
121    fn test_fill_adaptive_split_nats() {
122        let template = SplitImpl::Nats(NatsSplit::new(
123            "test-subject".to_owned(),
124            "0".into(),
125            NatsOffset::None,
126        ));
127
128        let splits = fill_adaptive_split(&template, 4).unwrap();
129        assert_eq!(splits.len(), 4);
130
131        for idx in 0..4 {
132            assert!(splits.contains_key(idx.to_string().as_str()));
133        }
134    }
135
136    #[test]
137    fn test_fill_adaptive_split_unsupported() {
138        // Use a Kafka split (not adaptive) to test the error path
139        let template = SplitImpl::Kafka(crate::source::kafka::split::KafkaSplit::new(
140            0,
141            None,
142            None,
143            "topic".into(),
144        ));
145        let result = fill_adaptive_split(&template, 2);
146        assert!(result.is_err());
147    }
148}
149
150/// A no-op source, which can be used to deprecate a source.
151/// When removing the code for a source, we can remove the source reader and executor,
152/// but we need to let the meta source manager running normally without panic.
153/// Otherwise the user can DROP the source.
154pub mod dummy {
155    use std::collections::HashMap;
156    use std::marker::PhantomData;
157
158    use risingwave_common::types::JsonbVal;
159    use serde::Deserialize;
160    use with_options::WithOptions;
161
162    use crate::enforce_secret::EnforceSecret;
163    use crate::error::ConnectorResult;
164    use crate::parser::ParserConfig;
165    use crate::source::{
166        BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
167        SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
168    };
169
170    /// See [`crate::source::util::dummy`].
171    #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
172    pub struct DummyProperties<T> {
173        _marker: PhantomData<T>,
174    }
175
176    impl<T> EnforceSecret for DummyProperties<T> {}
177
178    /// See [`crate::source::util::dummy`].
179    pub struct DummySplitEnumerator<T> {
180        _marker: PhantomData<T>,
181    }
182    /// See [`crate::source::util::dummy`].
183    #[derive(Deserialize, Debug, PartialEq, Clone)]
184    pub struct DummySplit<T> {
185        _marker: PhantomData<T>,
186    }
187
188    impl<T> UnknownFields for DummyProperties<T> {
189        fn unknown_fields(&self) -> HashMap<String, String> {
190            HashMap::new()
191        }
192    }
193
194    impl<T> SplitMetaData for DummySplit<T> {
195        fn id(&self) -> SplitId {
196            unreachable!()
197        }
198
199        fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
200            unreachable!()
201        }
202
203        fn encode_to_json(&self) -> JsonbVal {
204            unreachable!()
205        }
206
207        fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
208            unreachable!()
209        }
210    }
211
212    #[async_trait::async_trait]
213    impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
214        type Properties = DummyProperties<T>;
215        type Split = DummySplit<T>;
216
217        async fn new(
218            _properties: Self::Properties,
219            _context: SourceEnumeratorContextRef,
220        ) -> crate::error::ConnectorResult<Self> {
221            Ok(Self {
222                _marker: PhantomData,
223            })
224        }
225
226        async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
227            // no op
228            Ok(vec![])
229        }
230    }
231
232    pub struct DummySourceReader<T> {
233        _marker: PhantomData<T>,
234    }
235
236    #[async_trait::async_trait]
237    impl<T: Send> SplitReader for DummySourceReader<T> {
238        type Properties = DummyProperties<T>;
239        type Split = DummySplit<T>;
240
241        async fn new(
242            _props: Self::Properties,
243            _splits: Vec<Self::Split>,
244            _parser_config: ParserConfig,
245            _source_ctx: SourceContextRef,
246            _columns: Option<Vec<Column>>,
247        ) -> crate::error::ConnectorResult<Self> {
248            Ok(Self {
249                _marker: PhantomData,
250            })
251        }
252
253        fn into_stream(self) -> BoxSourceChunkStream {
254            unreachable!()
255        }
256    }
257}