risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::error::ConnectorError;
32use crate::sink::Result;
33
34pub const ES_OPTION_DELIMITER: &str = "delimiter";
35pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
36pub const ES_OPTION_INDEX: &str = "index";
37pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
38
39#[serde_as]
40#[derive(Deserialize, Debug, Clone, WithOptions)]
41pub struct ElasticSearchOpenSearchConfig {
42    #[serde(rename = "url")]
43    pub url: String,
44    /// The index's name of elasticsearch or openserach
45    #[serde(rename = "index")]
46    pub index: Option<String>,
47    /// If pk is set, then "pk1+delimiter+pk2+delimiter..." will be used as the key, if pk is not set, we will just use the first column as the key.
48    #[serde(rename = "delimiter")]
49    pub delimiter: Option<String>,
50    /// The username of elasticsearch or openserach
51    #[serde(rename = "username")]
52    pub username: Option<String>,
53    /// The username of elasticsearch or openserach
54    #[serde(rename = "password")]
55    pub password: Option<String>,
56    /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one
57    #[serde(rename = "index_column")]
58    pub index_column: Option<String>,
59
60    /// It is used for dynamic route, if it is be set, the value of this column will be used as the route
61    #[serde(rename = "routing_column")]
62    pub routing_column: Option<String>,
63
64    #[serde(rename = "retry_on_conflict")]
65    #[serde_as(as = "DisplayFromStr")]
66    #[serde(default = "default_retry_on_conflict")]
67    pub retry_on_conflict: i32,
68
69    #[serde(rename = "batch_num_messages")]
70    #[serde_as(as = "DisplayFromStr")]
71    #[serde(default = "default_batch_num_messages")]
72    pub batch_num_messages: usize,
73
74    #[serde(rename = "batch_size_kb")]
75    #[serde_as(as = "DisplayFromStr")]
76    #[serde(default = "default_batch_size_kb")]
77    pub batch_size_kb: usize,
78
79    #[serde(rename = "concurrent_requests")]
80    #[serde_as(as = "DisplayFromStr")]
81    #[serde(default = "default_concurrent_requests")]
82    pub concurrent_requests: usize,
83
84    #[serde(default = "default_type")]
85    pub r#type: String,
86}
87
88fn default_type() -> String {
89    "upsert".to_owned()
90}
91
92fn default_retry_on_conflict() -> i32 {
93    3
94}
95
96fn default_batch_num_messages() -> usize {
97    512
98}
99
100fn default_batch_size_kb() -> usize {
101    5 * 1024
102}
103
104fn default_concurrent_requests() -> usize {
105    1024
106}
107
108impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
109    type Error = ConnectorError;
110
111    fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
112        let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; // from ElasticsearchOpenSearchConfig
113
114        for k in value.0.keys() {
115            if !allowed_fields.contains(k.as_str()) {
116                return Err(ConnectorError::from(anyhow!(
117                    "Invalid field: {}, allowed fields: {:?}",
118                    k,
119                    allowed_fields
120                )));
121            }
122        }
123
124        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
125            serde_json::to_value(value.0.clone()).unwrap(),
126        )
127        .map_err(|e| SinkError::Config(anyhow!(e)))?;
128        Ok(config)
129    }
130}
131
132impl ElasticSearchOpenSearchConfig {
133    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
134        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
135            serde_json::to_value(properties).unwrap(),
136        )
137        .map_err(|e| SinkError::Config(anyhow!(e)))?;
138        Ok(config)
139    }
140
141    pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
142        let check_username_password = || -> Result<()> {
143            if self.username.is_some() && self.password.is_none() {
144                return Err(SinkError::Config(anyhow!(
145                    "please set the password when the username is set."
146                )));
147            }
148            if self.username.is_none() && self.password.is_some() {
149                return Err(SinkError::Config(anyhow!(
150                    "please set the username when the password is set."
151                )));
152            }
153            Ok(())
154        };
155        let url =
156            Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
157        if connector.eq(ES_SINK) {
158            let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
159                elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
160            );
161            if let Some(username) = &self.username
162                && let Some(password) = &self.password
163            {
164                transport_builder = transport_builder.auth(
165                    elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
166                );
167            }
168            check_username_password()?;
169            let transport = transport_builder
170                .build()
171                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
172            let client = elasticsearch::Elasticsearch::new(transport);
173            Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
174        } else if connector.eq(OPENSEARCH_SINK) {
175            let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
176                opensearch::http::transport::SingleNodeConnectionPool::new(url),
177            );
178            if let Some(username) = &self.username
179                && let Some(password) = &self.password
180            {
181                transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
182                    username.clone(),
183                    password.clone(),
184                ));
185            }
186            check_username_password()?;
187            let transport = transport_builder
188                .build()
189                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
190            let client = opensearch::OpenSearch::new(transport);
191            Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
192        } else {
193            panic!(
194                "connector type must be {} or {}, but get {}",
195                ES_SINK, OPENSEARCH_SINK, connector
196            );
197        }
198    }
199
200    pub fn validate_config(&self, schema: &Schema) -> Result<()> {
201        if self.index_column.is_some() && self.index.is_some()
202            || self.index_column.is_none() && self.index.is_none()
203        {
204            return Err(SinkError::Config(anyhow!(
205                "please set only one of the 'index_column' or 'index' properties."
206            )));
207        }
208
209        if let Some(index_column) = &self.index_column {
210            let filed = schema
211                .fields()
212                .iter()
213                .find(|f| &f.name == index_column)
214                .unwrap();
215            if filed.data_type() != DataType::Varchar {
216                return Err(SinkError::Config(anyhow!(
217                    "please ensure the data type of {} is varchar.",
218                    index_column
219                )));
220            }
221        }
222
223        if let Some(routing_column) = &self.routing_column {
224            let filed = schema
225                .fields()
226                .iter()
227                .find(|f| &f.name == routing_column)
228                .unwrap();
229            if filed.data_type() != DataType::Varchar {
230                return Err(SinkError::Config(anyhow!(
231                    "please ensure the data type of {} is varchar.",
232                    routing_column
233                )));
234            }
235        }
236        Ok(())
237    }
238
239    pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
240        let index_column_idx = self
241            .index_column
242            .as_ref()
243            .map(|n| {
244                schema
245                    .fields()
246                    .iter()
247                    .position(|s| &s.name == n)
248                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
249            })
250            .transpose()?;
251        Ok(index_column_idx)
252    }
253
254    pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
255        let routing_column_idx = self
256            .routing_column
257            .as_ref()
258            .map(|n| {
259                schema
260                    .fields()
261                    .iter()
262                    .position(|s| &s.name == n)
263                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
264            })
265            .transpose()?;
266        Ok(routing_column_idx)
267    }
268}