risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::enforce_secret::EnforceSecret;
32use crate::error::ConnectorError;
33use crate::sink::Result;
34
35pub const ES_OPTION_DELIMITER: &str = "delimiter";
36pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
37pub const ES_OPTION_INDEX: &str = "index";
38pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
39
40#[serde_as]
41#[derive(Deserialize, Debug, Clone, WithOptions)]
42pub struct ElasticSearchConfig {
43    #[serde(flatten)]
44    pub inner: ElasticSearchOpenSearchConfig,
45}
46
47#[serde_as]
48#[derive(Deserialize, Debug, Clone, WithOptions)]
49pub struct OpenSearchConfig {
50    #[serde(flatten)]
51    pub inner: ElasticSearchOpenSearchConfig,
52}
53
54#[serde_as]
55#[derive(Deserialize, Debug, Clone, WithOptions)]
56pub struct ElasticSearchOpenSearchConfig {
57    #[serde(rename = "url")]
58    pub url: String,
59    /// The index's name of elasticsearch or openserach
60    #[serde(rename = "index")]
61    pub index: Option<String>,
62    /// If pk is set, then "pk1+delimiter+pk2+delimiter..." will be used as the key, if pk is not set, we will just use the first column as the key.
63    #[serde(rename = "delimiter")]
64    pub delimiter: Option<String>,
65    /// The username of elasticsearch or openserach
66    #[serde(rename = "username")]
67    pub username: Option<String>,
68    /// The username of elasticsearch or openserach
69    #[serde(rename = "password")]
70    pub password: Option<String>,
71    /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one
72    #[serde(rename = "index_column")]
73    pub index_column: Option<String>,
74
75    /// It is used for dynamic route, if it is be set, the value of this column will be used as the route
76    #[serde(rename = "routing_column")]
77    pub routing_column: Option<String>,
78
79    #[serde(rename = "retry_on_conflict")]
80    #[serde_as(as = "DisplayFromStr")]
81    #[serde(default = "default_retry_on_conflict")]
82    pub retry_on_conflict: i32,
83
84    #[serde(rename = "batch_num_messages")]
85    #[serde_as(as = "DisplayFromStr")]
86    #[serde(default = "default_batch_num_messages")]
87    pub batch_num_messages: usize,
88
89    #[serde(rename = "batch_size_kb")]
90    #[serde_as(as = "DisplayFromStr")]
91    #[serde(default = "default_batch_size_kb")]
92    pub batch_size_kb: usize,
93
94    #[serde(rename = "concurrent_requests")]
95    #[serde_as(as = "DisplayFromStr")]
96    #[serde(default = "default_concurrent_requests")]
97    pub concurrent_requests: usize,
98
99    #[serde(default = "default_type")]
100    pub r#type: String,
101}
102
103impl EnforceSecret for ElasticSearchOpenSearchConfig {
104    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
105        "username",
106        "password",
107    };
108}
109
110fn default_type() -> String {
111    "upsert".to_owned()
112}
113
114fn default_retry_on_conflict() -> i32 {
115    3
116}
117
118fn default_batch_num_messages() -> usize {
119    512
120}
121
122fn default_batch_size_kb() -> usize {
123    5 * 1024
124}
125
126fn default_concurrent_requests() -> usize {
127    1024
128}
129
130impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
131    type Error = ConnectorError;
132
133    fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
134        let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; // from ElasticsearchOpenSearchConfig
135
136        for k in value.0.keys() {
137            if !allowed_fields.contains(k.as_str()) {
138                return Err(ConnectorError::from(anyhow!(
139                    "Invalid field: {}, allowed fields: {:?}",
140                    k,
141                    allowed_fields
142                )));
143            }
144        }
145
146        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
147            serde_json::to_value(value.0.clone()).unwrap(),
148        )
149        .map_err(|e| SinkError::Config(anyhow!(e)))?;
150        Ok(config)
151    }
152}
153
154impl ElasticSearchConfig {
155    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
156        let config = serde_json::from_value::<ElasticSearchConfig>(
157            serde_json::to_value(properties).unwrap(),
158        )
159        .map_err(|e| SinkError::Config(anyhow!(e)))?;
160        Ok(config)
161    }
162}
163
164impl OpenSearchConfig {
165    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
166        let config =
167            serde_json::from_value::<OpenSearchConfig>(serde_json::to_value(properties).unwrap())
168                .map_err(|e| SinkError::Config(anyhow!(e)))?;
169        Ok(config)
170    }
171}
172
173impl ElasticSearchOpenSearchConfig {
174    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
175        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
176            serde_json::to_value(properties).unwrap(),
177        )
178        .map_err(|e| SinkError::Config(anyhow!(e)))?;
179        Ok(config)
180    }
181
182    pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
183        let check_username_password = || -> Result<()> {
184            if self.username.is_some() && self.password.is_none() {
185                return Err(SinkError::Config(anyhow!(
186                    "please set the password when the username is set."
187                )));
188            }
189            if self.username.is_none() && self.password.is_some() {
190                return Err(SinkError::Config(anyhow!(
191                    "please set the username when the password is set."
192                )));
193            }
194            Ok(())
195        };
196        let url =
197            Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
198        if connector.eq(ES_SINK) {
199            let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
200                elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
201            );
202            if let Some(username) = &self.username
203                && let Some(password) = &self.password
204            {
205                transport_builder = transport_builder.auth(
206                    elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
207                );
208            }
209            check_username_password()?;
210            let transport = transport_builder
211                .build()
212                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
213            let client = elasticsearch::Elasticsearch::new(transport);
214            Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
215        } else if connector.eq(OPENSEARCH_SINK) {
216            let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
217                opensearch::http::transport::SingleNodeConnectionPool::new(url),
218            );
219            if let Some(username) = &self.username
220                && let Some(password) = &self.password
221            {
222                transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
223                    username.clone(),
224                    password.clone(),
225                ));
226            }
227            check_username_password()?;
228            let transport = transport_builder
229                .build()
230                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
231            let client = opensearch::OpenSearch::new(transport);
232            Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
233        } else {
234            panic!(
235                "connector type must be {} or {}, but get {}",
236                ES_SINK, OPENSEARCH_SINK, connector
237            );
238        }
239    }
240
241    pub fn validate_config(&self, schema: &Schema) -> Result<()> {
242        if self.index_column.is_some() && self.index.is_some()
243            || self.index_column.is_none() && self.index.is_none()
244        {
245            return Err(SinkError::Config(anyhow!(
246                "please set only one of the 'index_column' or 'index' properties."
247            )));
248        }
249
250        if let Some(index_column) = &self.index_column {
251            let filed = schema
252                .fields()
253                .iter()
254                .find(|f| &f.name == index_column)
255                .unwrap();
256            if filed.data_type() != DataType::Varchar {
257                return Err(SinkError::Config(anyhow!(
258                    "please ensure the data type of {} is varchar.",
259                    index_column
260                )));
261            }
262        }
263
264        if let Some(routing_column) = &self.routing_column {
265            let filed = schema
266                .fields()
267                .iter()
268                .find(|f| &f.name == routing_column)
269                .unwrap();
270            if filed.data_type() != DataType::Varchar {
271                return Err(SinkError::Config(anyhow!(
272                    "please ensure the data type of {} is varchar.",
273                    routing_column
274                )));
275            }
276        }
277        Ok(())
278    }
279
280    pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
281        let index_column_idx = self
282            .index_column
283            .as_ref()
284            .map(|n| {
285                schema
286                    .fields()
287                    .iter()
288                    .position(|s| &s.name == n)
289                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
290            })
291            .transpose()?;
292        Ok(index_column_idx)
293    }
294
295    pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
296        let routing_column_idx = self
297            .routing_column
298            .as_ref()
299            .map(|n| {
300                schema
301                    .fields()
302                    .iter()
303                    .position(|s| &s.name == n)
304                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
305            })
306            .transpose()?;
307        Ok(routing_column_idx)
308    }
309}