risingwave_connector/source/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::nats::split::NatsSplit;
21
22pub fn fill_adaptive_split(
23    split_template: &SplitImpl,
24    actor_in_use: &HashSet<u32>,
25) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
26    // Just Nats is adaptive for now
27    if let SplitImpl::Nats(split) = split_template {
28        let mut new_splits = BTreeMap::new();
29        for actor_id in actor_in_use {
30            let actor_id: Arc<str> = actor_id.to_string().into();
31            new_splits.insert(
32                actor_id.clone(),
33                SplitImpl::Nats(NatsSplit::new(
34                    split.subject.clone(),
35                    actor_id,
36                    split.start_sequence.clone(),
37                )),
38            );
39        }
40        tracing::debug!(
41            "Filled adaptive splits for Nats source, {} splits in total",
42            new_splits.len()
43        );
44        Ok(new_splits)
45    } else {
46        Err(ConnectorError::from(anyhow::anyhow!(
47            "Unsupported split type, expect Nats SplitImpl but get {:?}",
48            split_template
49        )))
50    }
51}
52
53/// A no-op source, which can be used to deprecate a source.
54/// When removing the code for a source, we can remove the source reader and executor,
55/// but we need to let the meta source manager running normally without panic.
56/// Otherwise the user can DROP the source.
57pub mod dummy {
58    use std::collections::HashMap;
59    use std::marker::PhantomData;
60
61    use risingwave_common::types::JsonbVal;
62    use serde::Deserialize;
63    use with_options::WithOptions;
64
65    use crate::enforce_secret::EnforceSecret;
66    use crate::error::ConnectorResult;
67    use crate::parser::ParserConfig;
68    use crate::source::{
69        BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
70        SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
71    };
72
73    /// See [`crate::source::util::dummy`].
74    #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
75    pub struct DummyProperties<T> {
76        _marker: PhantomData<T>,
77    }
78
79    impl<T> EnforceSecret for DummyProperties<T> {}
80
81    /// See [`crate::source::util::dummy`].
82    pub struct DummySplitEnumerator<T> {
83        _marker: PhantomData<T>,
84    }
85    /// See [`crate::source::util::dummy`].
86    #[derive(Deserialize, Debug, PartialEq, Clone)]
87    pub struct DummySplit<T> {
88        _marker: PhantomData<T>,
89    }
90
91    impl<T> UnknownFields for DummyProperties<T> {
92        fn unknown_fields(&self) -> HashMap<String, String> {
93            HashMap::new()
94        }
95    }
96
97    impl<T> SplitMetaData for DummySplit<T> {
98        fn id(&self) -> SplitId {
99            unreachable!()
100        }
101
102        fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
103            unreachable!()
104        }
105
106        fn encode_to_json(&self) -> JsonbVal {
107            unreachable!()
108        }
109
110        fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
111            unreachable!()
112        }
113    }
114
115    #[async_trait::async_trait]
116    impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
117        type Properties = DummyProperties<T>;
118        type Split = DummySplit<T>;
119
120        async fn new(
121            _properties: Self::Properties,
122            _context: SourceEnumeratorContextRef,
123        ) -> crate::error::ConnectorResult<Self> {
124            Ok(Self {
125                _marker: PhantomData,
126            })
127        }
128
129        async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
130            // no op
131            Ok(vec![])
132        }
133    }
134
135    pub struct DummySourceReader<T> {
136        _marker: PhantomData<T>,
137    }
138
139    #[async_trait::async_trait]
140    impl<T: Send> SplitReader for DummySourceReader<T> {
141        type Properties = DummyProperties<T>;
142        type Split = DummySplit<T>;
143
144        async fn new(
145            _props: Self::Properties,
146            _splits: Vec<Self::Split>,
147            _parser_config: ParserConfig,
148            _source_ctx: SourceContextRef,
149            _columns: Option<Vec<Column>>,
150        ) -> crate::error::ConnectorResult<Self> {
151            Ok(Self {
152                _marker: PhantomData,
153            })
154        }
155
156        fn into_stream(self) -> BoxSourceChunkStream {
157            unreachable!()
158        }
159    }
160}