risingwave_connector/source/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::nats::split::NatsSplit;
21
22pub fn fill_adaptive_split(
23    split_template: &SplitImpl,
24    actor_in_use: &HashSet<u32>,
25) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
26    // Just Nats is adaptive for now
27    if let SplitImpl::Nats(split) = split_template {
28        let mut new_splits = BTreeMap::new();
29        for actor_id in actor_in_use {
30            let actor_id: Arc<str> = actor_id.to_string().into();
31            new_splits.insert(
32                actor_id.clone(),
33                SplitImpl::Nats(NatsSplit::new(
34                    split.subject.clone(),
35                    actor_id,
36                    split.start_sequence.clone(),
37                )),
38            );
39        }
40        tracing::debug!(
41            "Filled adaptive splits for Nats source, {} splits in total",
42            new_splits.len()
43        );
44        Ok(new_splits)
45    } else {
46        Err(ConnectorError::from(anyhow::anyhow!(
47            "Unsupported split type, expect Nats SplitImpl but get {:?}",
48            split_template
49        )))
50    }
51}
52
53/// A no-op source, which can be used to deprecate a source.
54/// When removing the code for a source, we can remove the source reader and executor,
55/// but we need to let the meta source manager running normally without panic.
56/// Otherwise the user can DROP the source.
57pub mod dummy {
58    use std::collections::HashMap;
59    use std::marker::PhantomData;
60
61    use risingwave_common::types::JsonbVal;
62    use serde::Deserialize;
63    use with_options::WithOptions;
64
65    use crate::error::ConnectorResult;
66    use crate::parser::ParserConfig;
67    use crate::source::{
68        BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
69        SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
70    };
71
72    /// See [`crate::source::util::dummy`].
73    #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
74    pub struct DummyProperties<T> {
75        _marker: PhantomData<T>,
76    }
77    /// See [`crate::source::util::dummy`].
78    pub struct DummySplitEnumerator<T> {
79        _marker: PhantomData<T>,
80    }
81    /// See [`crate::source::util::dummy`].
82    #[derive(Deserialize, Debug, PartialEq, Clone)]
83    pub struct DummySplit<T> {
84        _marker: PhantomData<T>,
85    }
86
87    impl<T> UnknownFields for DummyProperties<T> {
88        fn unknown_fields(&self) -> HashMap<String, String> {
89            HashMap::new()
90        }
91    }
92
93    impl<T> SplitMetaData for DummySplit<T> {
94        fn id(&self) -> SplitId {
95            unreachable!()
96        }
97
98        fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
99            unreachable!()
100        }
101
102        fn encode_to_json(&self) -> JsonbVal {
103            unreachable!()
104        }
105
106        fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
107            unreachable!()
108        }
109    }
110
111    #[async_trait::async_trait]
112    impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
113        type Properties = DummyProperties<T>;
114        type Split = DummySplit<T>;
115
116        async fn new(
117            _properties: Self::Properties,
118            _context: SourceEnumeratorContextRef,
119        ) -> crate::error::ConnectorResult<Self> {
120            Ok(Self {
121                _marker: PhantomData,
122            })
123        }
124
125        async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
126            // no op
127            Ok(vec![])
128        }
129    }
130
131    pub struct DummySourceReader<T> {
132        _marker: PhantomData<T>,
133    }
134
135    #[async_trait::async_trait]
136    impl<T: Send> SplitReader for DummySourceReader<T> {
137        type Properties = DummyProperties<T>;
138        type Split = DummySplit<T>;
139
140        async fn new(
141            _props: Self::Properties,
142            _splits: Vec<Self::Split>,
143            _parser_config: ParserConfig,
144            _source_ctx: SourceContextRef,
145            _columns: Option<Vec<Column>>,
146        ) -> crate::error::ConnectorResult<Self> {
147            Ok(Self {
148                _marker: PhantomData,
149            })
150        }
151
152        fn into_stream(self) -> BoxSourceChunkStream {
153            unreachable!()
154        }
155    }
156}