risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use anyhow::anyhow;
18use maplit::hashset;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use serde::Deserialize;
22use serde_with::{DisplayFromStr, serde_as};
23use url::Url;
24use with_options::WithOptions;
25
26use super::super::SinkError;
27use super::elasticsearch::ES_SINK;
28use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
29use super::opensearch::OPENSEARCH_SINK;
30use crate::connector_common::ElasticsearchConnection;
31use crate::enforce_secret::EnforceSecret;
32use crate::error::ConnectorError;
33use crate::sink::Result;
34
35pub const ES_OPTION_DELIMITER: &str = "delimiter";
36pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
37pub const ES_OPTION_INDEX: &str = "index";
38pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
39
40#[serde_as]
41#[derive(Deserialize, Debug, Clone, WithOptions)]
42pub struct ElasticSearchOpenSearchConfig {
43    #[serde(rename = "url")]
44    pub url: String,
45    /// The index's name of elasticsearch or openserach
46    #[serde(rename = "index")]
47    pub index: Option<String>,
48    /// If pk is set, then "pk1+delimiter+pk2+delimiter..." will be used as the key, if pk is not set, we will just use the first column as the key.
49    #[serde(rename = "delimiter")]
50    pub delimiter: Option<String>,
51    /// The username of elasticsearch or openserach
52    #[serde(rename = "username")]
53    pub username: Option<String>,
54    /// The username of elasticsearch or openserach
55    #[serde(rename = "password")]
56    pub password: Option<String>,
57    /// It is used for dynamic index, if it is be set, the value of this column will be used as the index. It and `index` can only set one
58    #[serde(rename = "index_column")]
59    pub index_column: Option<String>,
60
61    /// It is used for dynamic route, if it is be set, the value of this column will be used as the route
62    #[serde(rename = "routing_column")]
63    pub routing_column: Option<String>,
64
65    #[serde(rename = "retry_on_conflict")]
66    #[serde_as(as = "DisplayFromStr")]
67    #[serde(default = "default_retry_on_conflict")]
68    pub retry_on_conflict: i32,
69
70    #[serde(rename = "batch_num_messages")]
71    #[serde_as(as = "DisplayFromStr")]
72    #[serde(default = "default_batch_num_messages")]
73    pub batch_num_messages: usize,
74
75    #[serde(rename = "batch_size_kb")]
76    #[serde_as(as = "DisplayFromStr")]
77    #[serde(default = "default_batch_size_kb")]
78    pub batch_size_kb: usize,
79
80    #[serde(rename = "concurrent_requests")]
81    #[serde_as(as = "DisplayFromStr")]
82    #[serde(default = "default_concurrent_requests")]
83    pub concurrent_requests: usize,
84
85    #[serde(default = "default_type")]
86    pub r#type: String,
87}
88
89impl EnforceSecret for ElasticSearchOpenSearchConfig {
90    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
91        "username",
92        "password",
93    };
94}
95
96fn default_type() -> String {
97    "upsert".to_owned()
98}
99
100fn default_retry_on_conflict() -> i32 {
101    3
102}
103
104fn default_batch_num_messages() -> usize {
105    512
106}
107
108fn default_batch_size_kb() -> usize {
109    5 * 1024
110}
111
112fn default_concurrent_requests() -> usize {
113    1024
114}
115
116impl TryFrom<&ElasticsearchConnection> for ElasticSearchOpenSearchConfig {
117    type Error = ConnectorError;
118
119    fn try_from(value: &ElasticsearchConnection) -> std::result::Result<Self, Self::Error> {
120        let allowed_fields: HashSet<&str> = hashset!["url", "username", "password"]; // from ElasticsearchOpenSearchConfig
121
122        for k in value.0.keys() {
123            if !allowed_fields.contains(k.as_str()) {
124                return Err(ConnectorError::from(anyhow!(
125                    "Invalid field: {}, allowed fields: {:?}",
126                    k,
127                    allowed_fields
128                )));
129            }
130        }
131
132        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
133            serde_json::to_value(value.0.clone()).unwrap(),
134        )
135        .map_err(|e| SinkError::Config(anyhow!(e)))?;
136        Ok(config)
137    }
138}
139
140impl ElasticSearchOpenSearchConfig {
141    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
142        let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
143            serde_json::to_value(properties).unwrap(),
144        )
145        .map_err(|e| SinkError::Config(anyhow!(e)))?;
146        Ok(config)
147    }
148
149    pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
150        let check_username_password = || -> Result<()> {
151            if self.username.is_some() && self.password.is_none() {
152                return Err(SinkError::Config(anyhow!(
153                    "please set the password when the username is set."
154                )));
155            }
156            if self.username.is_none() && self.password.is_some() {
157                return Err(SinkError::Config(anyhow!(
158                    "please set the username when the password is set."
159                )));
160            }
161            Ok(())
162        };
163        let url =
164            Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
165        if connector.eq(ES_SINK) {
166            let mut transport_builder = elasticsearch::http::transport::TransportBuilder::new(
167                elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
168            );
169            if let Some(username) = &self.username
170                && let Some(password) = &self.password
171            {
172                transport_builder = transport_builder.auth(
173                    elasticsearch::auth::Credentials::Basic(username.clone(), password.clone()),
174                );
175            }
176            check_username_password()?;
177            let transport = transport_builder
178                .build()
179                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
180            let client = elasticsearch::Elasticsearch::new(transport);
181            Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
182        } else if connector.eq(OPENSEARCH_SINK) {
183            let mut transport_builder = opensearch::http::transport::TransportBuilder::new(
184                opensearch::http::transport::SingleNodeConnectionPool::new(url),
185            );
186            if let Some(username) = &self.username
187                && let Some(password) = &self.password
188            {
189                transport_builder = transport_builder.auth(opensearch::auth::Credentials::Basic(
190                    username.clone(),
191                    password.clone(),
192                ));
193            }
194            check_username_password()?;
195            let transport = transport_builder
196                .build()
197                .map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
198            let client = opensearch::OpenSearch::new(transport);
199            Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
200        } else {
201            panic!(
202                "connector type must be {} or {}, but get {}",
203                ES_SINK, OPENSEARCH_SINK, connector
204            );
205        }
206    }
207
208    pub fn validate_config(&self, schema: &Schema) -> Result<()> {
209        if self.index_column.is_some() && self.index.is_some()
210            || self.index_column.is_none() && self.index.is_none()
211        {
212            return Err(SinkError::Config(anyhow!(
213                "please set only one of the 'index_column' or 'index' properties."
214            )));
215        }
216
217        if let Some(index_column) = &self.index_column {
218            let filed = schema
219                .fields()
220                .iter()
221                .find(|f| &f.name == index_column)
222                .unwrap();
223            if filed.data_type() != DataType::Varchar {
224                return Err(SinkError::Config(anyhow!(
225                    "please ensure the data type of {} is varchar.",
226                    index_column
227                )));
228            }
229        }
230
231        if let Some(routing_column) = &self.routing_column {
232            let filed = schema
233                .fields()
234                .iter()
235                .find(|f| &f.name == routing_column)
236                .unwrap();
237            if filed.data_type() != DataType::Varchar {
238                return Err(SinkError::Config(anyhow!(
239                    "please ensure the data type of {} is varchar.",
240                    routing_column
241                )));
242            }
243        }
244        Ok(())
245    }
246
247    pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
248        let index_column_idx = self
249            .index_column
250            .as_ref()
251            .map(|n| {
252                schema
253                    .fields()
254                    .iter()
255                    .position(|s| &s.name == n)
256                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
257            })
258            .transpose()?;
259        Ok(index_column_idx)
260    }
261
262    pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
263        let routing_column_idx = self
264            .routing_column
265            .as_ref()
266            .map(|n| {
267                schema
268                    .fields()
269                    .iter()
270                    .position(|s| &s.name == n)
271                    .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
272            })
273            .transpose()?;
274        Ok(routing_column_idx)
275    }
276}