risingwave_connector/source/
util.rs1use std::collections::{BTreeMap, HashSet};
16use std::sync::Arc;
17
18use crate::error::{ConnectorError, ConnectorResult};
19use crate::source::SplitImpl;
20use crate::source::nats::split::NatsSplit;
21
22pub fn fill_adaptive_split(
23 split_template: &SplitImpl,
24 actor_in_use: &HashSet<u32>,
25) -> ConnectorResult<BTreeMap<Arc<str>, SplitImpl>> {
26 if let SplitImpl::Nats(split) = split_template {
28 let mut new_splits = BTreeMap::new();
29 for actor_id in actor_in_use {
30 let actor_id: Arc<str> = actor_id.to_string().into();
31 new_splits.insert(
32 actor_id.clone(),
33 SplitImpl::Nats(NatsSplit::new(
34 split.subject.clone(),
35 actor_id,
36 split.start_sequence.clone(),
37 )),
38 );
39 }
40 tracing::debug!(
41 "Filled adaptive splits for Nats source, {} splits in total",
42 new_splits.len()
43 );
44 Ok(new_splits)
45 } else {
46 Err(ConnectorError::from(anyhow::anyhow!(
47 "Unsupported split type, expect Nats SplitImpl but get {:?}",
48 split_template
49 )))
50 }
51}
52
53pub mod dummy {
58 use std::collections::HashMap;
59 use std::marker::PhantomData;
60
61 use risingwave_common::types::JsonbVal;
62 use serde::Deserialize;
63 use with_options::WithOptions;
64
65 use crate::error::ConnectorResult;
66 use crate::parser::ParserConfig;
67 use crate::source::{
68 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef,
69 SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
70 };
71
72 #[derive(Deserialize, Debug, Clone, WithOptions, Default)]
74 pub struct DummyProperties<T> {
75 _marker: PhantomData<T>,
76 }
77 pub struct DummySplitEnumerator<T> {
79 _marker: PhantomData<T>,
80 }
81 #[derive(Deserialize, Debug, PartialEq, Clone)]
83 pub struct DummySplit<T> {
84 _marker: PhantomData<T>,
85 }
86
87 impl<T> UnknownFields for DummyProperties<T> {
88 fn unknown_fields(&self) -> HashMap<String, String> {
89 HashMap::new()
90 }
91 }
92
93 impl<T> SplitMetaData for DummySplit<T> {
94 fn id(&self) -> SplitId {
95 unreachable!()
96 }
97
98 fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
99 unreachable!()
100 }
101
102 fn encode_to_json(&self) -> JsonbVal {
103 unreachable!()
104 }
105
106 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
107 unreachable!()
108 }
109 }
110
111 #[async_trait::async_trait]
112 impl<T: Send> SplitEnumerator for DummySplitEnumerator<T> {
113 type Properties = DummyProperties<T>;
114 type Split = DummySplit<T>;
115
116 async fn new(
117 _properties: Self::Properties,
118 _context: SourceEnumeratorContextRef,
119 ) -> crate::error::ConnectorResult<Self> {
120 Ok(Self {
121 _marker: PhantomData,
122 })
123 }
124
125 async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
126 Ok(vec![])
128 }
129 }
130
131 pub struct DummySourceReader<T> {
132 _marker: PhantomData<T>,
133 }
134
135 #[async_trait::async_trait]
136 impl<T: Send> SplitReader for DummySourceReader<T> {
137 type Properties = DummyProperties<T>;
138 type Split = DummySplit<T>;
139
140 async fn new(
141 _props: Self::Properties,
142 _splits: Vec<Self::Split>,
143 _parser_config: ParserConfig,
144 _source_ctx: SourceContextRef,
145 _columns: Option<Vec<Column>>,
146 ) -> crate::error::ConnectorResult<Self> {
147 Ok(Self {
148 _marker: PhantomData,
149 })
150 }
151
152 fn into_stream(self) -> BoxSourceChunkStream {
153 unreachable!()
154 }
155 }
156}