risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs1use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::enforce_secret::EnforceSecret;
32use crate::error::ConnectorError;
33use crate::sink::Result;
34
35pub const ES_OPTION_DELIMITER: &str = "delimiter";
36pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
37pub const ES_OPTION_INDEX: &str = "index";
38pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
39
40#[serde_as]
41#[derive(Deserialize, Debug, Clone, WithOptions)]
42pub struct ElasticSearchConfig {
43 #[serde(flatten)]
44 pub inner: ElasticSearchOpenSearchConfig,
45}
46
47#[serde_as]
48#[derive(Deserialize, Debug, Clone, WithOptions)]
49pub struct OpenSearchConfig {
50 #[serde(flatten)]
51 pub inner: ElasticSearchOpenSearchConfig,
52}
53
54#[serde_as]
55#[derive(Deserialize, Debug, Clone, WithOptions)]
56pub struct ElasticSearchOpenSearchConfig {
57 #[serde(rename = "url")]
58 pub url: String,
59 #[serde(rename = "index")]
61 pub index: Option<String>,
62 #[serde(rename = "delimiter")]
64 pub delimiter: Option<String>,
65 #[serde(rename = "username")]
67 pub username: Option<String>,
68 #[serde(rename = "password")]
70 pub password: Option<String>,
71 #[serde(rename = "index_column")]
73 pub index_column: Option<String>,
74
75 #[serde(rename = "routing_column")]
77 pub routing_column: Option<String>,
78
79 #[serde(rename = "retry_on_conflict")]
80 #[serde_as(as = "DisplayFromStr")]
81 #[serde(default = "default_retry_on_conflict")]
82 pub retry_on_conflict: i32,
83
84 #[serde(rename = "batch_num_messages")]
85 #[serde_as(as = "DisplayFromStr")]
86 #[serde(default = "default_batch_num_messages")]
87 pub batch_num_messages: usize,
88
89 #[serde(rename = "batch_size_kb")]
90 #[serde_as(as = "DisplayFromStr")]
91 #[serde(default = "default_batch_size_kb")]
92 pub batch_size_kb: usize,
93
94 #[serde(rename = "concurrent_requests")]
95 #[serde_as(as = "DisplayFromStr")]
96 #[serde(default = "default_concurrent_requests")]
97 pub concurrent_requests: usize,
98
99 #[serde(default = "default_type")]
100 pub r#type: String,
101}
102
103impl EnforceSecret for ElasticSearchOpenSearchConfig {
104 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
105 "username",
106 "password",
107 };
108}
109
110fn default_type() -> String {
111 "upsert".to_owned()
112}
113
114fn default_retry_on_conflict() -> i32 {
115 3
116}
117
118fn default_batch_num_messages() -> usize {
119 512
120}
121
122fn default_batch_size_kb() -> usize {
123 5 * 1024
124}
125
126fn default_concurrent_requests() -> usize {
127 1024
128}
129
130impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
131 type Error = ConnectorError;
132
133 fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
134 let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; for k in value.0.keys() {
137 if !allowed_fields.contains(k.as_str()) {
138 return Err(ConnectorError::from(anyhow!(
139 "Invalid field: {}, allowed fields: {:?}",
140 k,
141 allowed_fields
142 )));
143 }
144 }
145
146 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
147 serde_json::to_value(value.0.clone()).unwrap(),
148 )
149 .map_err(|e| SinkError::Config(anyhow!(e)))?;
150 Ok(config)
151 }
152}
153
154impl ElasticSearchConfig {
155 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
156 let config = serde_json::from_value::<ElasticSearchConfig>(
157 serde_json::to_value(properties).unwrap(),
158 )
159 .map_err(|e| SinkError::Config(anyhow!(e)))?;
160 Ok(config)
161 }
162}
163
164impl OpenSearchConfig {
165 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
166 let config =
167 serde_json::from_value::<OpenSearchConfig>(serde_json::to_value(properties).unwrap())
168 .map_err(|e| SinkError::Config(anyhow!(e)))?;
169 Ok(config)
170 }
171}
172
173impl ElasticSearchOpenSearchConfig {
174 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
175 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
176 serde_json::to_value(properties).unwrap(),
177 )
178 .map_err(|e| SinkError::Config(anyhow!(e)))?;
179 Ok(config)
180 }
181
182 pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
183 let check_username_password = || -> Result<()> {
184 if self.username.is_some() && self.password.is_none() {
185 return Err(SinkError::Config(anyhow!(
186 "please set the password when the username is set."
187 )));
188 }
189 if self.username.is_none() && self.password.is_some() {
190 return Err(SinkError::Config(anyhow!(
191 "please set the username when the password is set."
192 )));
193 }
194 Ok(())
195 };
196 let url =
197 Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
198 if connector.eq(ES_SINK) {
199 let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
200 elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
201 );
202 if let Some(username) = &self.username
203 && let Some(password) = &self.password
204 {
205 transport_builder = transport_builder.auth(
206 elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
207 );
208 }
209 check_username_password()?;
210 let transport = transport_builder
211 .build()
212 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
213 let client = elasticsearch::Elasticsearch::new(transport);
214 Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
215 } else if connector.eq(OPENSEARCH_SINK) {
216 let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
217 opensearch::http::transport::SingleNodeConnectionPool::new(url),
218 );
219 if let Some(username) = &self.username
220 && let Some(password) = &self.password
221 {
222 transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
223 username.clone(),
224 password.clone(),
225 ));
226 }
227 check_username_password()?;
228 let transport = transport_builder
229 .build()
230 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
231 let client = opensearch::OpenSearch::new(transport);
232 Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
233 } else {
234 panic!(
235 "connector type must be {} or {}, but get {}",
236 ES_SINK, OPENSEARCH_SINK, connector
237 );
238 }
239 }
240
241 pub fn validate_config(&self, schema: &Schema) -> Result<()> {
242 if self.index_column.is_some() && self.index.is_some()
243 || self.index_column.is_none() && self.index.is_none()
244 {
245 return Err(SinkError::Config(anyhow!(
246 "please set only one of the 'index_column' or 'index' properties."
247 )));
248 }
249
250 if let Some(index_column) = &self.index_column {
251 let filed = schema
252 .fields()
253 .iter()
254 .find(|f| &f.name == index_column)
255 .unwrap();
256 if filed.data_type() != DataType::Varchar {
257 return Err(SinkError::Config(anyhow!(
258 "please ensure the data type of {} is varchar.",
259 index_column
260 )));
261 }
262 }
263
264 if let Some(routing_column) = &self.routing_column {
265 let filed = schema
266 .fields()
267 .iter()
268 .find(|f| &f.name == routing_column)
269 .unwrap();
270 if filed.data_type() != DataType::Varchar {
271 return Err(SinkError::Config(anyhow!(
272 "please ensure the data type of {} is varchar.",
273 routing_column
274 )));
275 }
276 }
277 Ok(())
278 }
279
280 pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
281 let index_column_idx = self
282 .index_column
283 .as_ref()
284 .map(|n| {
285 schema
286 .fields()
287 .iter()
288 .position(|s| &s.name == n)
289 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
290 })
291 .transpose()?;
292 Ok(index_column_idx)
293 }
294
295 pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
296 let routing_column_idx = self
297 .routing_column
298 .as_ref()
299 .map(|n| {
300 schema
301 .fields()
302 .iter()
303 .position(|s| &s.name == n)
304 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
305 })
306 .transpose()?;
307 Ok(routing_column_idx)
308 }
309}