risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs1use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::enforce_secret::EnforceSecret;
32use crate::error::ConnectorError;
33use crate::sink::Result;
34
35pub const ES_OPTION_DELIMITER: &str = "delimiter";
36pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
37pub const ES_OPTION_INDEX: &str = "index";
38pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
39
40#[serde_as]
41#[derive(Deserialize, Debug, Clone, WithOptions)]
42pub struct ElasticSearchOpenSearchConfig {
43 #[serde(rename = "url")]
44 pub url: String,
45 #[serde(rename = "index")]
47 pub index: Option<String>,
48 #[serde(rename = "delimiter")]
50 pub delimiter: Option<String>,
51 #[serde(rename = "username")]
53 pub username: Option<String>,
54 #[serde(rename = "password")]
56 pub password: Option<String>,
57 #[serde(rename = "index_column")]
59 pub index_column: Option<String>,
60
61 #[serde(rename = "routing_column")]
63 pub routing_column: Option<String>,
64
65 #[serde(rename = "retry_on_conflict")]
66 #[serde_as(as = "DisplayFromStr")]
67 #[serde(default = "default_retry_on_conflict")]
68 pub retry_on_conflict: i32,
69
70 #[serde(rename = "batch_num_messages")]
71 #[serde_as(as = "DisplayFromStr")]
72 #[serde(default = "default_batch_num_messages")]
73 pub batch_num_messages: usize,
74
75 #[serde(rename = "batch_size_kb")]
76 #[serde_as(as = "DisplayFromStr")]
77 #[serde(default = "default_batch_size_kb")]
78 pub batch_size_kb: usize,
79
80 #[serde(rename = "concurrent_requests")]
81 #[serde_as(as = "DisplayFromStr")]
82 #[serde(default = "default_concurrent_requests")]
83 pub concurrent_requests: usize,
84
85 #[serde(default = "default_type")]
86 pub r#type: String,
87}
88
89impl EnforceSecret for ElasticSearchOpenSearchConfig {
90 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
91 "username",
92 "password",
93 };
94}
95
96fn default_type() -> String {
97 "upsert".to_owned()
98}
99
100fn default_retry_on_conflict() -> i32 {
101 3
102}
103
104fn default_batch_num_messages() -> usize {
105 512
106}
107
108fn default_batch_size_kb() -> usize {
109 5 * 1024
110}
111
112fn default_concurrent_requests() -> usize {
113 1024
114}
115
116impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
117 type Error = ConnectorError;
118
119 fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
120 let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; for k in value.0.keys() {
123 if !allowed_fields.contains(k.as_str()) {
124 return Err(ConnectorError::from(anyhow!(
125 "Invalid field: {}, allowed fields: {:?}",
126 k,
127 allowed_fields
128 )));
129 }
130 }
131
132 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
133 serde_json::to_value(value.0.clone()).unwrap(),
134 )
135 .map_err(|e| SinkError::Config(anyhow!(e)))?;
136 Ok(config)
137 }
138}
139
140impl ElasticSearchOpenSearchConfig {
141 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
142 let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
143 serde_json::to_value(properties).unwrap(),
144 )
145 .map_err(|e| SinkError::Config(anyhow!(e)))?;
146 Ok(config)
147 }
148
149 pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
150 let check_username_password = || -> Result<()> {
151 if self.username.is_some() && self.password.is_none() {
152 return Err(SinkError::Config(anyhow!(
153 "please set the password when the username is set."
154 )));
155 }
156 if self.username.is_none() && self.password.is_some() {
157 return Err(SinkError::Config(anyhow!(
158 "please set the username when the password is set."
159 )));
160 }
161 Ok(())
162 };
163 let url =
164 Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
165 if connector.eq(ES_SINK) {
166 let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
167 elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
168 );
169 if let Some(username) = &self.username
170 && let Some(password) = &self.password
171 {
172 transport_builder = transport_builder.auth(
173 elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
174 );
175 }
176 check_username_password()?;
177 let transport = transport_builder
178 .build()
179 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
180 let client = elasticsearch::Elasticsearch::new(transport);
181 Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
182 } else if connector.eq(OPENSEARCH_SINK) {
183 let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
184 opensearch::http::transport::SingleNodeConnectionPool::new(url),
185 );
186 if let Some(username) = &self.username
187 && let Some(password) = &self.password
188 {
189 transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
190 username.clone(),
191 password.clone(),
192 ));
193 }
194 check_username_password()?;
195 let transport = transport_builder
196 .build()
197 .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
198 let client = opensearch::OpenSearch::new(transport);
199 Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
200 } else {
201 panic!(
202 "connector type must be {} or {}, but get {}",
203 ES_SINK, OPENSEARCH_SINK, connector
204 );
205 }
206 }
207
208 pub fn validate_config(&self, schema: &Schema) -> Result<()> {
209 if self.index_column.is_some() && self.index.is_some()
210 || self.index_column.is_none() && self.index.is_none()
211 {
212 return Err(SinkError::Config(anyhow!(
213 "please set only one of the 'index_column' or 'index' properties."
214 )));
215 }
216
217 if let Some(index_column) = &self.index_column {
218 let filed = schema
219 .fields()
220 .iter()
221 .find(|f| &f.name == index_column)
222 .unwrap();
223 if filed.data_type() != DataType::Varchar {
224 return Err(SinkError::Config(anyhow!(
225 "please ensure the data type of {} is varchar.",
226 index_column
227 )));
228 }
229 }
230
231 if let Some(routing_column) = &self.routing_column {
232 let filed = schema
233 .fields()
234 .iter()
235 .find(|f| &f.name == routing_column)
236 .unwrap();
237 if filed.data_type() != DataType::Varchar {
238 return Err(SinkError::Config(anyhow!(
239 "please ensure the data type of {} is varchar.",
240 routing_column
241 )));
242 }
243 }
244 Ok(())
245 }
246
247 pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
248 let index_column_idx = self
249 .index_column
250 .as_ref()
251 .map(|n| {
252 schema
253 .fields()
254 .iter()
255 .position(|s| &s.name == n)
256 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
257 })
258 .transpose()?;
259 Ok(index_column_idx)
260 }
261
262 pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
263 let routing_column_idx = self
264 .routing_column
265 .as_ref()
266 .map(|n| {
267 schema
268 .fields()
269 .iter()
270 .position(|s| &s.name == n)
271 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
272 })
273 .transpose()?;
274 Ok(routing_column_idx)
275 }
276}