risingwave_connector/source/
util.rs1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::nats::split::NatsSplit;
21
22pub fn fill_adaptive_split(
23 split_template: &SplitImpl,
24 actor_in_use: &HashSet<u32>,
25) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
26 if let SplitImpl::Nats(split) = split_template {
28 let mut new_splits = BTreeMap::new();
29 for actor_id in actor_in_use {
30 let actor_id: Arc<str> = actor_id.to_string().into();
31 new_splits.insert(
32 actor_id.clone(),
33 SplitImpl::Nats(NatsSplit::new(
34 split.subject.clone(),
35 actor_id,
36 split.start_sequence.clone(),
37 )),
38 );
39 }
40 tracing::debug!(
41 "Filled adaptive splits for Nats source, {} splits in total",
42 new_splits.len()
43 );
44 Ok(new_splits)
45 } else {
46 Err(ConnectorError::from(anyhow::anyhow!(
47 "Unsupported split type, expect Nats SplitImpl but get {:?}",
48 split_template
49 )))
50 }
51}
52
53pub mod dummy {
58 use std::collections::HashMap;
59 use std::marker::PhantomData;
60
61 use risingwave_common::types::JsonbVal;
62 use serde::Deserialize;
63 use with_options::WithOptions;
64
65 use crate::enforce_secret::EnforceSecret;
66 use crate::error::ConnectorResult;
67 use crate::parser::ParserConfig;
68 use crate::source::{
69 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
70 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
71 };
72
73 #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
75 pub struct DummyProperties<T> {
76 _marker: PhantomData<T>,
77 }
78
79 impl<T> EnforceSecret for DummyProperties<T> {}
80
81 pub struct DummySplitEnumerator<T> {
83 _marker: PhantomData<T>,
84 }
85 #[derive(Deserialize, Debug, PartialEq, Clone)]
87 pub struct DummySplit<T> {
88 _marker: PhantomData<T>,
89 }
90
91 impl<T> UnknownFields for DummyProperties<T> {
92 fn unknown_fields(&self) -> HashMap<String, String> {
93 HashMap::new()
94 }
95 }
96
97 impl<T> SplitMetaData for DummySplit<T> {
98 fn id(&self) -> SplitId {
99 unreachable!()
100 }
101
102 fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
103 unreachable!()
104 }
105
106 fn encode_to_json(&self) -> JsonbVal {
107 unreachable!()
108 }
109
110 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
111 unreachable!()
112 }
113 }
114
115 #[async_trait::async_trait]
116 impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
117 type Properties = DummyProperties<T>;
118 type Split = DummySplit<T>;
119
120 async fn new(
121 _properties: Self::Properties,
122 _context: SourceEnumeratorContextRef,
123 ) -> crate::error::ConnectorResult<Self> {
124 Ok(Self {
125 _marker: PhantomData,
126 })
127 }
128
129 async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
130 Ok(vec![])
132 }
133 }
134
135 pub struct DummySourceReader<T> {
136 _marker: PhantomData<T>,
137 }
138
139 #[async_trait::async_trait]
140 impl<T: Send> SplitReader for DummySourceReader<T> {
141 type Properties = DummyProperties<T>;
142 type Split = DummySplit<T>;
143
144 async fn new(
145 _props: Self::Properties,
146 _splits: Vec<Self::Split>,
147 _parser_config: ParserConfig,
148 _source_ctx: SourceContextRef,
149 _columns: Option<Vec<Column>>,
150 ) -> crate::error::ConnectorResult<Self> {
151 Ok(Self {
152 _marker: PhantomData,
153 })
154 }
155
156 fn into_stream(self) -> BoxSourceChunkStream {
157 unreachable!()
158 }
159 }
160}