risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs
1use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::error::ConnectorError;
32use crate::sink::Result;
33
34pub const ES_OPTION_DELIMITER: &str = "delimiter";
35pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
36pub const ES_OPTION_INDEX: &str = "index";
37pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
38
39#[serde_as]
40#[derive(Deserialize, Debug, Clone, WithOptions)]
41pub struct ElasticSearchOpenSearchConfig {
42 #[serde(rename = "url")]
43 pub url: String,
44 #[serde(rename = "index")]
46 pub index: Option<String>,
47 #[serde(rename = "delimiter")]
49 pub delimiter: Option<String>,
50 #[serde(rename = "username")]
52 pub username: Option<String>,
53 #[serde(rename = "password")]
55 pub password: Option<String>,
56 #[serde(rename = "index_column")]
58 pub index_column: Option<String>,
59
60 #[serde(rename = "routing_column")]
62 pub routing_column: Option<String>,
63
64 #[serde(rename = "retry_on_conflict")]
65 #[serde_as(as = "DisplayFromStr")]
66 #[serde(default = "default_retry_on_conflict")]
67 pub retry_on_conflict: i32,
68
69 #[serde(rename = "batch_num_messages")]
70 #[serde_as(as = "DisplayFromStr")]
71 #[serde(default = "default_batch_num_messages")]
72 pub batch_num_messages: usize,
73
74 #[serde(rename = "batch_size_kb")]
75 #[serde_as(as = "DisplayFromStr")]
76 #[serde(default = "default_batch_size_kb")]
77 pub batch_size_kb: usize,
78
79 #[serde(rename = "concurrent_requests")]
80 #[serde_as(as = "DisplayFromStr")]
81 #[serde(default = "default_concurrent_requests")]
82 pub concurrent_requests: usize,
83
84 #[serde(default = "default_type")]
85 pub r#type: String,
86}
87
88fn default_type() -> String {
89 "upsert".to_owned()
90}
91
92fn default_retry_on_conflict() -> i32 {
93 3
94}
95
96fn default_batch_num_messages() -> usize {
97 512
98}
99
100fn default_batch_size_kb() -> usize {
101 5 * 1024
102}
103
104fn default_concurrent_requests() -> usize {
105 1024
106}
107
108impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
109 type Error = ConnectorError;
110
111 fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
112 let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; for k in value.0.keys() {
115 if !allowed_fields.contains(k.as_str()) {
116 return Err(ConnectorError::from(anyhow!(
117 "Invalid field: {}, allowed fields: {:?}",
118 k,
119 allowed_fields
120 )));
121 }
122 }
123
124 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
125 serde_json::to_value(value.0.clone()).unwrap(),
126 )
127 .map_err(|e| SinkError::Config(anyhow!(e)))?;
128 Ok(config)
129 }
130}
131
132impl ElasticSearchOpenSearchConfig {
133 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
134 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
135 serde_json::to_value(properties).unwrap(),
136 )
137 .map_err(|e| SinkError::Config(anyhow!(e)))?;
138 Ok(config)
139 }
140
141 pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
142 let check_username_password = || -> Result<()> {
143 if self.username.is_some() && self.password.is_none() {
144 return Err(SinkError::Config(anyhow!(
145 "please set the password when the username is set."
146 )));
147 }
148 if self.username.is_none() && self.password.is_some() {
149 return Err(SinkError::Config(anyhow!(
150 "please set the username when the password is set."
151 )));
152 }
153 Ok(())
154 };
155 let url =
156 Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
157 if connector.eq(ES_SINK) {
158 let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
159 elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
160 );
161 if let Some(username) = &self.username
162 && let Some(password) = &self.password
163 {
164 transport_builder = transport_builder.auth(
165 elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
166 );
167 }
168 check_username_password()?;
169 let transport = transport_builder
170 .build()
171 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
172 let client = elasticsearch::Elasticsearch::new(transport);
173 Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
174 } else if connector.eq(OPENSEARCH_SINK) {
175 let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
176 opensearch::http::transport::SingleNodeConnectionPool::new(url),
177 );
178 if let Some(username) = &self.username
179 && let Some(password) = &self.password
180 {
181 transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
182 username.clone(),
183 password.clone(),
184 ));
185 }
186 check_username_password()?;
187 let transport = transport_builder
188 .build()
189 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
190 let client = opensearch::OpenSearch::new(transport);
191 Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
192 } else {
193 panic!(
194 "connector type must be {} or {}, but get {}",
195 ES_SINK, OPENSEARCH_SINK, connector
196 );
197 }
198 }
199
200 pub fn validate_config(&self, schema: &Schema) -> Result<()> {
201 if self.index_column.is_some() && self.index.is_some()
202 || self.index_column.is_none() && self.index.is_none()
203 {
204 return Err(SinkError::Config(anyhow!(
205 "please set only one of the 'index_column' or 'index' properties."
206 )));
207 }
208
209 if let Some(index_column) = &self.index_column {
210 let filed = schema
211 .fields()
212 .iter()
213 .find(|f| &f.name == index_column)
214 .unwrap();
215 if filed.data_type() != DataType::Varchar {
216 return Err(SinkError::Config(anyhow!(
217 "please ensure the data type of {} is varchar.",
218 index_column
219 )));
220 }
221 }
222
223 if let Some(routing_column) = &self.routing_column {
224 let filed = schema
225 .fields()
226 .iter()
227 .find(|f| &f.name == routing_column)
228 .unwrap();
229 if filed.data_type() != DataType::Varchar {
230 return Err(SinkError::Config(anyhow!(
231 "please ensure the data type of {} is varchar.",
232 routing_column
233 )));
234 }
235 }
236 Ok(())
237 }
238
239 pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
240 let index_column_idx = self
241 .index_column
242 .as_ref()
243 .map(|n| {
244 schema
245 .fields()
246 .iter()
247 .position(|s| &s.name == n)
248 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
249 })
250 .transpose()?;
251 Ok(index_column_idx)
252 }
253
254 pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
255 let routing_column_idx = self
256 .routing_column
257 .as_ref()
258 .map(|n| {
259 schema
260 .fields()
261 .iter()
262 .position(|s| &s.name == n)
263 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
264 })
265 .transpose()?;
266 Ok(routing_column_idx)
267 }
268}